From 8f03e719fa14890282d02a4acc216cd3b63db0f3 Mon Sep 17 00:00:00 2001 From: JohnDoee Date: Tue, 17 Nov 2015 22:41:14 +0100 Subject: [PATCH] restructured plugin --- streaming/core.py | 764 +++++++++++++++++++----------------- streaming/data/config.glade | 60 ++- streaming/data/streaming.js | 25 +- streaming/filelike.py | 2 + streaming/gtkui.py | 66 +++- 5 files changed, 537 insertions(+), 380 deletions(-) diff --git a/streaming/core.py b/streaming/core.py index 9266d7d..6e41904 100644 --- a/streaming/core.py +++ b/streaming/core.py @@ -40,25 +40,23 @@ import base64 import json import logging -import math import os -import time import urllib +import deluge.configmanager + from collections import defaultdict -from datetime import datetime, timedelta + +from deluge import component +from deluge._libtorrent import lt +from deluge.core.rpcserver import export +from deluge.plugins.pluginbase import CorePluginBase from twisted.internet import reactor, defer, task from twisted.python import randbytes -from twisted.web import server, resource, static, http -from twisted.web.static import StaticProducer - -import deluge.component as component -import deluge.configmanager -from deluge.core.rpcserver import export -from deluge.log import LOG as log -from deluge.plugins.pluginbase import CorePluginBase +from twisted.web import server, resource, static, http, client +from .filelike import FilelikeObjectResource from .resource import Resource logger = logging.getLogger(__name__) @@ -68,22 +66,17 @@ DEFAULT_PREFS = { 'port': 46123, 'allow_remote': False, 'reset_complete': True, + 'use_stream_urls': True, + 'auto_open_stream_urls': False, 'remote_username': 'username', 'remote_password': 'password', } -from .filelike import FilelikeObjectResource -MIN_QUEUE_CHUNKS = 6 -EXPECTED_PERCENT = 0.3 -EXPECTED_SIZE = 5*1024*1024 -HANDLERS_TIMEOUT = timedelta(hours=12) - -class UnknownTorrentException(Exception): - pass - -class UnknownFileException(Exception): - pass +def sleep(seconds): + d = defer.Deferred() + reactor.callLater(seconds, d.callback, seconds) + return d class FileServeResource(resource.Resource): isLeaf = True @@ -106,35 +99,12 @@ class FileServeResource(resource.Resource): if key not in self.file_mapping: return resource.NoResource().render(request) - v = self.file_mapping[key] - if isinstance(v, static.File): - return v.render_GET(request) + f = self.file_mapping[key] + if f.is_complete(): + return static.File(f.full_path).render_GET(request) else: - tf = v.copy() - tf.open() - return FilelikeObjectResource(tf, tf.size).render_GET(request) - -class AddTorrentResource(Resource): - isLeaf = True - - def __init__(self, client, *args, **kwargs): - self.client = client - Resource.__init__(self, *args, **kwargs) - - @defer.inlineCallbacks - def render_POST(self, request): - torrent_data = request.args.get('torrent_data', None) - if not torrent_data: - defer.returnValue(json.dumps({'status': 'error', 'message': 'missing torrent_data in request'})) - - torrent_data = torrent_data[0].encode('base64') - - torrent_id = yield self.client.add_torrent(torrent_data) - - if torrent_id is None: - defer.returnValue(json.dumps({'status': 'error', 'message': 'failed to add torrent'})) - - defer.returnValue(json.dumps({'status': 'success', 'infohash': torrent_id, 'message': 'torrent added successfully'})) + tfr = f.open() + return FilelikeObjectResource(tfr, f.size).render_GET(request) class StreamResource(Resource): isLeaf = True @@ -157,310 +127,392 @@ class StreamResource(Resource): result = yield self.client.stream_torrent(infohash[0], path[0]) defer.returnValue(json.dumps(result)) -class TorrentFile(object): - file_handler = None - def __init__(self, torrent_handler, file_path, torrent_file_path, size, chunk_size, offset, file_index): - self.torrent_handler = torrent_handler - self.file_path = self.path = file_path - self.torrent_file_path = torrent_file_path - self.first_chunk = offset / chunk_size - self.last_chunk = (offset + size) / chunk_size - self.chunk_size = chunk_size - self.offset = offset - self.file_index = file_index - self.size = size - self.last_requested_chunk = self.first_chunk - self.is_closed = False - self.is_active = False - self.last_activity = datetime.now() - self.end_chunks = [] +class UnknownTorrentException(Exception): + pass + +class UnknownFileException(Exception): + pass + +class TorrentFileReader(object): + def __init__(self, torrent_file): + self.torrent_file = torrent_file + self.size = torrent_file.size + self.position = 0 - self.first_chunk_end = self.chunk_size * (self.first_chunk + 1) - offset - - self.torrent_handler.torrent_files[torrent_file_path].append(self) - - def open(self): - if self.is_closed: - raise IOError('Unable to reopen file') - - self.file_handler = open(self.file_path, 'rb') - - def get_chunk(self, tell): - i = (tell + 1) - self.first_chunk_end - if i <= 0: - offset = 0 - else: - offset = (i / self.chunk_size) + 1 - - return self.first_chunk + offset, self.first_chunk_end + (offset * self.chunk_size) - - def prepare_torrent(self, buffer_pieces): - self.torrent_handler.schedule_chunk(self.first_chunk, 0) - self.torrent_handler.schedule_chunk(self.last_chunk, 0) - - self.end_chunks.append(self.last_chunk) - - if self.first_chunk != self.last_chunk: - self.torrent_handler.schedule_chunk(self.last_chunk-1, 0) - self.end_chunks.append(self.last_chunk-1) - - for chunk, chunk_status in enumerate(self.torrent_handler.torrent.status.pieces[self.first_chunk:min(self.first_chunk+buffer_pieces, self.last_chunk)+1], self.first_chunk): - self.torrent_handler.schedule_chunk(chunk, chunk-self.first_chunk) + self.waiting_for_piece = None + self.current_piece = None + self.current_piece_data = None @defer.inlineCallbacks def read(self, size=1024): - self.is_active = True - self.last_activity = datetime.now() + required_piece, read_position = self.torrent_file.get_piece_info(self.position) - tell = self.tell() - chunk, end_of_chunk = self.get_chunk(tell) - self.last_requested_chunk = chunk + if self.current_piece != required_piece: + logger.debug('We are missing piece %i and it is required, requesting' % (required_piece, )) + self.waiting_for_piece = required_piece + self.current_piece_data = yield self.torrent_file.get_piece_data(required_piece) + self.current_piece = required_piece + self.waiting_for_piece = None - logger.debug('waiting for chunk %s, %s, %s' % (chunk, size, tell)) - yield self.wait_chunk_complete(chunk) - logger.debug('done waiting %s, %s, %s' % (chunk, size, tell)) + logger.debug('We can read from local piece from %s size %s from position %s' % (read_position, size, self.position)) + data = self.current_piece_data[read_position:read_position+size] + self.position += len(data) - defer.returnValue(self.file_handler.read(min(end_of_chunk-tell, size))) - - def wait_chunk_complete(self, chunk): - d = defer.Deferred() - - def check_if_done(): - if self.torrent_handler.torrent.status.pieces[chunk]: - return d.callback(True) - - self.torrent_handler.schedule_chunk(chunk, 0) - - if self.is_closed: - logger.debug('The file closed, shutting down torrent') - return - - reactor.callLater(1.0, check_if_done) - - check_if_done() - - return d - - def seek(self, offset, whence=os.SEEK_SET): - return self.file_handler.seek(offset, whence) + defer.returnValue(data) def tell(self): - return self.file_handler.tell() + return self.position def close(self): - if not self.is_closed and self in self.torrent_handler.torrent_files[self.torrent_file_path]: - self.torrent_handler.torrent_files[self.torrent_file_path].remove(self) - self.is_closed = True - - if self.file_handler: - return self.file_handler.close() + self.torrent_file.close(self) - def copy(self): - tf = TorrentFile(self.torrent_handler, self.file_path, self.torrent_file_path, - self.size, self.chunk_size, self.offset, self.file_index) - return tf + def seek(self, offset, whence=os.SEEK_SET): + self.position = offset -class TorrentHandler(object): - def __init__(self, torrent, torrent_id, core): +class TorrentFile(object): # can be read from, knows about itself + def __init__(self, torrent, first_piece, last_piece, piece_size, offset, path, full_path, size, index): self.torrent = torrent - self.torrent_handle = torrent.handle - self.torrent_id = torrent_id - self.core = core - self.priorities = [0] * len(torrent.get_status(['files'])['files']) - self.torrent_files = defaultdict(list) - self.priorities_increased = {} - # need to blackhole all pieces not downloaded yet - self.last_activity = datetime.now() - self.update_chunk_priorities() + self.first_piece = first_piece + self.last_piece = last_piece + self.piece_size = piece_size + self.offset = offset + self.path = path + self.size = size + self.full_path = full_path + self.index = index + + self.file_requested = False + self.do_shutdown = False + self.first_piece_end = self.piece_size * (self.first_piece + 1) - offset + self.waiting_pieces = {} + self.current_readers = [] + self.registered_alert = False + + self.alerts = component.get("AlertManager") - def is_buffered(self, first_chunk, last_chunk): - buffer_status = [x for x in self.torrent.status.pieces[first_chunk:last_chunk+1] if not x] - logger.debug('Current buffer status for file (%s): %r' % (first_chunk, buffer_status)) - if buffer_status: - return False - else: - return True + def open(self): + """ + Returns a filelike object + """ + if not self.registered_alert: + self.alerts.register_handler("read_piece_alert", self.on_alert_got_piece_data) + self.registered_alert = True + tfr = TorrentFileReader(self) + self.current_readers.append(tfr) + self.file_requested = False + return tfr - def update_priorities(self): - self.torrent.set_file_priorities(self.priorities) + def close(self, tfr): + self.current_readers.remove(tfr) + self.torrent.unprioritize_pieces(tfr) - def schedule_chunk(self, chunk, distance): - if chunk not in self.priorities_increased: - logger.debug('Scheduled chunk %s at distance %s' % (chunk, distance)) - - self.torrent_handle.piece_priority(chunk, (7 if distance <= 4 else 6)) - self.torrent_handle.set_piece_deadline(chunk, 700*(distance+1)) - - self.priorities_increased[chunk] = True + def is_complete(self): + torrent_status = self.torrent.torrent.get_status(['file_progress', 'state']) + file_progress = torrent_status['file_progress'] + return file_progress[self.index] == 1.0 - def update_chunk_priority(self, tfs): - handled_heads = [] - for tf in sorted(tfs, key=lambda x:x.last_requested_chunk, reverse=True): - if datetime.now() - tf.last_activity > HANDLERS_TIMEOUT: - logger.debug('Torrentfile timed out') - tf.close() - continue - - if tf.last_requested_chunk is not None: - if handled_heads: - if tf.last_requested_chunk in handled_heads: - continue - - already_queued = False - for i in sorted(handled_heads): - if i > tf.last_requested_chunk: - if 0 not in self.torrent.status.pieces[tf.last_requested_chunk:i]: - already_queued = True - break - if already_queued: - continue - - offset = tf.last_requested_chunk + 1 - - current_buffer_offset = 5 - for chunk in tf.end_chunks: - if not self.torrent.status.pieces[chunk]: - logger.info('End chunks are not downloaded, setting buffer offset differently') - current_buffer_offset = 20 - break - - currently_downloading = set() - for peer in self.torrent.handle.get_peer_info(): - if peer.downloading_piece_index != -1: - currently_downloading.add(peer.downloading_piece_index) - - status_increase_count = 0 - current_buffer = 0 - found_buffer_end = False - for chunk, chunk_status in enumerate(self.torrent.status.pieces[offset:tf.last_chunk+1], offset): - if not chunk_status and chunk not in currently_downloading: - if not found_buffer_end: - logger.debug('Found buffer end at %s, have a buffer of %s' % (chunk, current_buffer)) - found_buffer_end = True - - if status_increase_count <= MIN_QUEUE_CHUNKS: - self.schedule_chunk(chunk, chunk-offset) - elif self.torrent_handle.piece_priority(chunk) == 0: - self.torrent_handle.piece_priority(chunk, 1) - - status_increase_count += 1 - elif not found_buffer_end and chunk not in currently_downloading: - current_buffer += 1 - - if status_increase_count >= max(MIN_QUEUE_CHUNKS, current_buffer-current_buffer_offset): - break - - handled_heads.append(tf.last_requested_chunk) - - return bool(handled_heads) - - def update_chunk_priorities(self): # TODO: check if torrent still exists - start_time = time.time() - file_progress = self.torrent.get_status(['file_progress'])['file_progress'] - incomplete_files = False - - for torrent_file_path, tfs in self.torrent_files.items(): - if not tfs: - logger.debug('No heads left for %r' % torrent_file_path) - del self.torrent_files[torrent_file_path] - continue - - tf = tfs[0] - if file_progress[tf.file_index] == 1.0: - logger.info('%s is already complete, skipping' % torrent_file_path) - continue - - if self.update_chunk_priority(tfs): - self.last_activity = datetime.now() - - incomplete_files = True - - if not incomplete_files and self.core.config['reset_complete'] and not all(self.priorities): - logger.info('We are not doing any file streamings, but not downloading all files, changing that.') - self.priorities = [1] * len(self.priorities) - self.update_priorities() - - if datetime.now() - self.last_activity > HANDLERS_TIMEOUT: - logger.debug('Torrent handler idle, killing myself.') - if self.torrent_id in self.core.torrent_handlers: - del self.core.torrent_handlers[self.torrent_id] + def get_piece_info(self, tell): + return divmod((self.offset + tell), self.piece_size) + + def on_alert_got_piece_data(self, alert): + torrent_id = str(alert.handle.info_hash()) + if torrent_id != self.torrent.infohash: return - logger.debug('Took %s seconds to handle a loop' % (time.time() - start_time)) - reactor.callLater(1, self.update_chunk_priorities) - - def blackhole_all_pieces(self, first_chunk, last_chunk): - for chunk in range(first_chunk, last_chunk+1): - self.torrent_handle.piece_priority(chunk, 0) + logger.debug('Got piece data for piece %s' % alert.piece) + if alert.piece not in self.waiting_pieces: + logger.debug('Got data for piece %i, but no data needed for this piece?' % alert.piece) + return + # TODO: check piece size is not zero + self.waiting_pieces[alert.piece].callback(alert.buffer) @defer.inlineCallbacks - def get_file(self, filepath_or_index): - status = self.torrent.get_status(['piece_length', 'files', 'file_priorities', 'file_progress', 'state', 'save_path']) - pieces = self.torrent.status.pieces - piece_length = status['piece_length'] - files = status['files'] + def get_piece_data(self, piece): + logger.debug('Trying to get piece data for piece %s' % piece) + for reader in self.current_readers: + if reader.current_piece == piece: + defer.returnValue(reader.current_piece_data) - for i, f, priority, progress in zip(range(len(files)), files, status['file_priorities'], status['file_progress']): - if i == filepath_or_index or f['path'] == filepath_or_index: - f['first_piece'] = f['offset'] / piece_length - f['last_piece'] = (f['offset'] + f['size']) / piece_length - f['pieces'] = pieces[f['first_piece']:f['last_piece']+1] - f['priority'] = priority - f['progress'] = progress - - break - else: - raise UnknownFileException() + if piece not in self.waiting_pieces: + self.waiting_pieces[piece] = defer.Deferred() - fp = os.path.join(status['save_path'], f['path']) - - if progress == 1: # file is complete, no need to fire up all the torrent jazz - defer.returnValue(static.File(fp)) - - self.priorities = [0] * len(self.priorities) - self.priorities[f['index']] = 3 - current_tfs = [] - for tfs in self.torrent_files.values(): - if not tfs: - continue - tf = tfs[0] - self.priorities[tf.file_index] = 3 - current_tfs.append((tf, self.torrent_handle.piece_priorities()[tf.first_chunk:tf.last_chunk+1])) - - self.update_priorities() - - for tf, chunk_status in current_tfs: - logger.info('Setting chunks to old status') - for i, chunk in enumerate(chunk_status, tf.first_chunk): - logger.debug('Setting status on chunk %s back to %s' % (i, chunk)) - self.torrent_handle.piece_priority(i, chunk) - - self.torrent.resume() - - percent_pieces = int(math.ceil((len(f['pieces']) / 100.0) * EXPECTED_PERCENT)) - size_pieces = int(min(math.ceil((EXPECTED_SIZE * 1.0) / piece_length), f['pieces'])) - expected_pieces = max(percent_pieces, size_pieces) # we need to download either 5% or 5MB of the file before allowing stream. - - tf = TorrentFile(self, fp, f['path'], f['size'], status['piece_length'], f['offset'], f['index']) - - if len(self.torrent_files[f['path']]) == 1: - self.blackhole_all_pieces(tf.first_chunk, tf.last_chunk) - - tf.prepare_torrent(expected_pieces) - - for _ in range(300): - if os.path.isfile(fp) and self.is_buffered(tf.first_chunk, tf.first_chunk+expected_pieces) and self.is_buffered(tf.last_chunk, tf.last_chunk): - break - + logger.debug('Waiting for %s' % piece) + self.torrent.schedule_piece(self, piece, 0) + while not self.torrent.torrent.handle.have_piece(piece): + if self.do_shutdown: + raise Exception() + logger.debug('Did not have piece %i, waiting' % piece) + self.torrent.unrelease() yield sleep(1) - defer.returnValue(tf) + self.torrent.torrent.handle.read_piece(piece) + + data = yield self.waiting_pieces[piece] + if piece in self.waiting_pieces: + del self.waiting_pieces[piece] + logger.debug('Done waiting for piece %i, returning data' % piece) + defer.returnValue(data) + + def shutdown(self): + #self.alerts.deregister_handler(self.on_alert_torrent_finished) + self.do_shutdown = True -def sleep(seconds): - d = defer.Deferred() - reactor.callLater(seconds, d.callback, seconds) - return d +class Torrent(object): + def __init__(self, torrent_handler, infohash): + self.infohash = infohash + self.torrent = component.get("TorrentManager").torrents.get(infohash, None) + self.torrent_handler = torrent_handler + + if not self.torrent: + raise UnknownTorrentException('%s is not a known infohash' % infohash) + + self.torrent_files = None + self.priority_increased = defaultdict(set) + self.do_shutdown = False + self.torrent_released = False # set to True if all the files are set to download + + + self.populate_files() + self.file_priorities = [0] * len(self.torrent_files) + + self.last_piece = self.torrent_files[-1].last_piece + self.torrent.handle.set_sequential_download(True) + reactor.callLater(0, self.update_piece_priority) + reactor.callLater(0, self.blackhole_all_pieces, 0, self.last_piece) + + def populate_files(self): + self.torrent_files = [] + + status = self.torrent.get_status(['piece_length', 'files', 'save_path']) + piece_length = status['piece_length'] + files = status['files'] + save_path = status['save_path'] + + for f in files: + first_piece = f['offset'] / piece_length + last_piece = (f['offset'] + f['size']) / piece_length + full_path = os.path.join(save_path, f['path']) + self.torrent_files.append(TorrentFile(self, first_piece, last_piece, piece_length, f['offset'], + f['path'], full_path, f['size'], f['index'])) + + return files + + def find_file(self, file_or_index=None): + best_file = None + biggest_file_size = 0 + + for i, f in enumerate(self.torrent_files): + if file_or_index is not None: + if i == file_or_index or f.path == file_or_index: + best_file = f + break + else: + if f.size > biggest_file_size: + best_file = f + + return best_file + + def get_file(self, file_or_index=None): + f = self.find_file(file_or_index) + if f is None: + raise UnknownFileException('Was unable to find %s' % file_or_index) + + return f + + def unprioritize_pieces(self, tfr): + logger.debug('Unprioritizing pieces for %s' % tfr) + currently_downloading = self.get_currently_downloading() + + for piece, increased_by in self.priority_increased.items(): + if tfr in increased_by: + increased_by.remove(tfr) + if not increased_by and piece not in currently_downloading and not self.torrent.status.pieces[piece]: + logger.debug('Unprioritizing piece %s' % piece) + self.torrent.handle.piece_priority(piece, 0) + + def get_currently_downloading(self): + currently_downloading = set() + for peer in self.torrent.handle.get_peer_info(): + if peer.downloading_piece_index != -1: + currently_downloading.add(peer.downloading_piece_index) + + return currently_downloading + + def blackhole_all_pieces(self, first_piece, last_piece): + currently_downloading = self.get_currently_downloading() + + logger.debug('Blacklisting pieces from %i to %i skipping %r' % (first_piece, last_piece, currently_downloading)) + for piece in range(first_piece, last_piece+1): + if piece not in currently_downloading and not self.torrent.status.pieces[piece]: + if piece in self.priority_increased: + continue + logger.debug('Setting piece priority %s to blacklist' % piece) + self.torrent.handle.piece_priority(piece, 0) + + def unrelease(self): + if self.torrent_released: + logger.debug('Unreleasing %s' % self.infohash) + self.torrent_released = False + self.torrent.set_file_priorities(self.file_priorities) + self.blackhole_all_pieces(0, self.last_piece) + + def get_torrent_file(self, file_or_index): + f = self.get_file(file_or_index) + f.file_requested = True + + self.torrent.resume() + + should_update_priorities = False + if self.file_priorities[f.index] == 0: + self.file_priorities[f.index] = 3 + should_update_priorities = True + + if self.torrent_released: + should_update_priorities = True + + if should_update_priorities and not f.is_complete(): # Need to do this stuff on seek too + self.unrelease() + + return f + + def shutdown(self): + logger.info('Shutting down torrent %s' % self.infohash) + self.do_shutdown = True + + for tf in self.torrent_files: + tf.shutdown() + + def schedule_piece(self, torrent_file, piece, distance): + if torrent_file not in self.priority_increased[piece]: + if not self.priority_increased[piece]: + self.priority_increased[piece].add(torrent_file) + + logger.debug('Scheduled piece %s at distance %s' % (piece, distance)) + + self.torrent.handle.piece_priority(piece, (7 if distance <= 4 else 6)) + self.torrent.handle.set_piece_deadline(piece, 700*(distance+1)) + + self.priority_increased[piece].add(torrent_file) + + def do_pieces_schedule(self, torrent_file, currently_downloading, from_piece): + logger.debug('Looking for stuff to do with pieces for file %s from piece %s' % (torrent_file, from_piece)) + + priority_increased = 0 + chain_size = 0 + download_chain_size = 0 + end_of_chain = False + + current_buffer_offset = 5 + if self.torrent.status.pieces[torrent_file.last_piece]: + if torrent_file.first_piece != torrent_file.last_piece and self.torrent.status.pieces[torrent_file.last_piece-1] \ + or torrent_file.first_piece == torrent_file.last_piece: + current_buffer_offset = 20 + + for piece, status in enumerate(self.torrent.status.pieces[from_piece:torrent_file.last_piece+1], from_piece): + if not end_of_chain: + if status: + chain_size += 1 + elif piece in currently_downloading: + download_chain_size += 1 + + if not status and piece not in currently_downloading: + if not end_of_chain: + status_increase = max(11, chain_size-current_buffer_offset) + end_of_chain = True + + priority_increased += 1 + if priority_increased >= status_increase: + logger.debug('Done increasing priority for %i pieces' % status_increase) + break + + self.schedule_piece(torrent_file, piece, piece-from_piece) + else: + logger.info('We are done with the rest of this chain, we might be able to increase others') + return True + + return False + + def update_piece_priority(self): # if all do_pieces_schedule returns true, allow all pices of file to be downloaded or whole torernt + if self.do_shutdown: + return + + logger.debug('Updating piece priority for %s' % self.infohash) + + currently_downloading = set() + for peer in self.torrent.handle.get_peer_info(): + if peer.downloading_piece_index != -1: + currently_downloading.add(peer.downloading_piece_index) + + all_heads_done = True + for f in self.torrent_files: + if not f.file_requested and not f.current_readers: + continue + + logger.debug('Rescheduling file %s' % f.path) + + if f.file_requested: + all_heads_done &= self.do_pieces_schedule(f, currently_downloading, f.first_piece) + self.schedule_piece(f, f.last_piece, 0) + if f.first_piece != f.last_piece: + self.schedule_piece(f, f.last_piece-1, 1) + + for tfr in f.current_readers: + if tfr.waiting_for_piece is not None: + logger.debug('Scheduling based on waiting for piece %s' % tfr.waiting_for_piece) + all_heads_done &= self.do_pieces_schedule(f, currently_downloading, tfr.waiting_for_piece) + elif tfr.current_piece is not None: + logger.debug('Scheduling based on current piece %s' % tfr.current_piece) + all_heads_done &= self.do_pieces_schedule(f, currently_downloading, tfr.current_piece) + + if all_heads_done and not self.torrent_released: + logger.debug('We are already done with all heads, figuring out what to do next') + + if self.torrent_handler.config['reset_complete']: + self.torrent_released = True + logger.debug('Resetting all disabled files') + file_priorities = [(1 if fp == 0 else fp) for fp in self.file_priorities] + self.torrent.set_file_priorities(file_priorities) + + if not all_heads_done and self.torrent_released: + logger.debug('Seems like the torrent was released too early') + self.unrelease() + + reactor.callLater(0.3, self.update_piece_priority) + +class TorrentHandler(object): + def __init__(self, config): + self.torrents = {} + self.config = config + + self.alerts = component.get("AlertManager") + self.alerts.register_handler("torrent_removed_alert", self.on_alert_torrent_removed) + + def get_stream(self, infohash, file_or_index=None): + logger.info('Trying to stream infohash %s and file %s' % (infohash, file_or_index)) + if infohash not in self.torrents: + self.torrents[infohash] = Torrent(self, infohash) + + return self.torrents[infohash].get_torrent_file(file_or_index) + + def on_alert_torrent_removed(self, alert): + try: + torrent_id = str(alert.handle.info_hash()) + except (RuntimeError, KeyError): + logger.warning('Failed to handle on torrent remove alert') + return + + if torrent_id not in self.torrents: + return + + self.torrents[torrent_id].shutdown() + del self.torrents[torrent_id] + + def shutdown(self): + logger.debug('Shutting down TorrentHandler') + self.alerts.deregister_handler(self.on_alert_torrent_removed) + for torrent in self.torrents.values(): + torrent.shutdown() class Core(CorePluginBase): def enable(self): @@ -470,9 +522,6 @@ class Core(CorePluginBase): self.resource = Resource() self.resource.putChild('file', self.fsr) if self.config['allow_remote']: - self.resource.putChild('add_torrent', AddTorrentResource(username=self.config['remote_username'], - password=self.config['remote_password'], - client=self)) self.resource.putChild('stream', StreamResource(username=self.config['remote_username'], password=self.config['remote_password'], client=self)) @@ -487,7 +536,7 @@ class Core(CorePluginBase): except AttributeError: logger.warning('Unable to exclude partial pieces') - self.torrent_handlers = {} + self.torrent_handler = TorrentHandler(self.config) try: self.listening = reactor.listenTCP(self.config['port'], self.site, interface=self.config['ip']) @@ -497,6 +546,7 @@ class Core(CorePluginBase): @defer.inlineCallbacks def disable(self): self.site.stopFactory() + self.torrent_handler.shutdown() yield self.listening.stopListening() def update(self): @@ -506,7 +556,6 @@ class Core(CorePluginBase): @defer.inlineCallbacks def set_config(self, config): """Sets the config dictionary""" - do_reload = False for key in config.keys(): self.config[key] = config[key] self.config.save() @@ -521,43 +570,36 @@ class Core(CorePluginBase): @export @defer.inlineCallbacks - def add_torrent(self, torrent_data): - core = component.get("Core") - tid = yield core.add_torrent_file('file.torrent', torrent_data, {'add_paused': True}) + def stream_torrent(self, infohash=None, url=None, filedump=None, filepath_or_index=None): + tor = component.get("TorrentManager").torrents.get(infohash, None) - tor = component.get("TorrentManager").torrents.get(tid, None) - - state = tor.get_status(['files']) - tor.set_file_priorities([0] * len(state['files'])) - - defer.returnValue(tid) - - def get_torrent_handler(self, tid): - if tid not in self.torrent_handlers: - tor = component.get("TorrentManager").torrents.get(tid, None) + if tor is None: + logger.info('Did not find torrent, must add it') - if tor is None: - raise UnknownTorrentException() + if not filedump and url: + filedump = yield client.getPage(url) - self.torrent_handlers[tid] = TorrentHandler(tor, tid, self) + if not filedump: + defer.returnValue({'status': 'error', 'message': 'unable to find torrent, provide infohash, url or filedump'}) + + torrent_info = lt.torrent_info(lt.bdecode(filedump)) + infohash = str(torrent_info.info_hash()) - return self.torrent_handlers[tid] - - @export - @defer.inlineCallbacks - def stream_torrent(self, tid, filepath_or_index): + core = component.get("Core") + try: + yield core.add_torrent_file('file.torrent', filedump.encode('base64'), {'add_paused': True}) + except: + defer.returnValue({'status': 'error', 'message': 'failed to add torrent'}) + try: - torrent_handler = yield self.get_torrent_handler(tid) - except UnknownTorrentException: # torrent isn't added yet - defer.returnValue({'status': 'error', 'message': 'torrent_not_found'}) - - try: - tf = yield torrent_handler.get_file(filepath_or_index) - except UnknownFileException: - defer.returnValue({'status': 'error', 'message': 'file_not_found'}) + tf = self.torrent_handler.get_stream(infohash, filepath_or_index) + except UnknownTorrentException: + defer.returnValue({'status': 'error', 'message': 'unable to find torrent, probably failed to add it'}) defer.returnValue({ 'status': 'success', + 'use_stream_urls': self.config['use_stream_urls'], + 'auto_open_stream_urls': self.config['auto_open_stream_urls'], 'url': 'http://%s:%s/file/%s/%s' % (self.config.config['ip'], self.config.config['port'], - self.fsr.add_file(tf), urllib.quote_plus(os.path.basename(tf.path))) + self.fsr.add_file(tf), urllib.quote_plus(os.path.basename(tf.path))) }) \ No newline at end of file diff --git a/streaming/data/config.glade b/streaming/data/config.glade index 5d0a43f..a061b44 100644 --- a/streaming/data/config.glade +++ b/streaming/data/config.glade @@ -33,6 +33,28 @@ + + + True + 0 + Use stream urls: + + + 2 + + + + + + True + 0 + Auto-open stream urls: + + + 3 + + + True @@ -40,7 +62,7 @@ Reset "do not download" when streamed file is complete: - 2 + 4 @@ -51,7 +73,7 @@ Allow remote control: - 3 + 5 @@ -62,7 +84,7 @@ Remote username: - 4 + 6 @@ -73,7 +95,7 @@ Remote password: - 5 + 7 @@ -108,7 +130,7 @@ - + True True @@ -118,7 +140,7 @@ - + True True @@ -128,7 +150,7 @@ - + True True @@ -137,6 +159,26 @@ + + + True + True + + + 5 + + + + + + True + True + + + 6 + + + True @@ -144,13 +186,13 @@ True - 5 + 7 - 1 + 7 diff --git a/streaming/data/streaming.js b/streaming/data/streaming.js index facb73a..0dae3a0 100644 --- a/streaming/data/streaming.js +++ b/streaming/data/streaming.js @@ -68,6 +68,18 @@ PreferencePage = Ext.extend(Ext.Panel, { fieldLabel: 'IP' })); + om.bind('use_stream_urls', fieldset.add({ + xtype: 'checkbox', + name: 'use_stream_urls', + fieldLabel: 'Use StreamProtocol urls', + })); + + om.bind('auto_open_stream_urls', fieldset.add({ + xtype: 'checkbox', + name: 'auto_open_stream_urls', + fieldLabel: 'AutoOpen StreamProtocol urls', + })); + om.bind('reset_complete', fieldset.add({ xtype: 'checkbox', name: 'reset_complete', @@ -150,10 +162,19 @@ StreamingPlugin = Ext.extend(Deluge.Plugin, { var fileIndex = nodes[0].attributes.fileIndex; var tid = files.torrentId; if (fileIndex >= 0) { - deluge.client.streaming.stream_torrent(tid, fileIndex, { + deluge.client.streaming.stream_torrent(tid, null, null, fileIndex, { success: function (result) { + console.log('Got result', result); if (result.status == 'success') { - Ext.Msg.alert('Stream ready', 'URL for stream: ' + result.url + ''); + var url = result.url; + if (result.use_stream_urls) { + url = 'stream+' + url; + if (result.auto_open_stream_urls) { + window.location.assign(url); + return; + } + } + Ext.Msg.alert('Stream ready', 'URL for stream: ' + url + ''); } else { Ext.Msg.alert('Stream failed', 'Error message: ' + result.message); } diff --git a/streaming/filelike.py b/streaming/filelike.py index acef851..ea4ff65 100644 --- a/streaming/filelike.py +++ b/streaming/filelike.py @@ -68,6 +68,7 @@ class MultipleRangeStaticProducer(static.MultipleRangeStaticProducer): if done: self.request.unregisterProducer() self.request.finish() + self.stopProducing() self.request = None class FilelikeObjectResource(static.File): @@ -141,6 +142,7 @@ class FilelikeObjectResource(static.File): producer = self.makeProducer(request, self.fileObject) if request.method == 'HEAD': + self.fileObject.close() return '' producer.start() diff --git a/streaming/gtkui.py b/streaming/gtkui.py index 38008c0..cdf9ce5 100644 --- a/streaming/gtkui.py +++ b/streaming/gtkui.py @@ -37,7 +37,9 @@ # statement from all source files in the program, then also delete it here. # +import json import gtk +import webbrowser from deluge.log import LOG as log from deluge.ui.client import client @@ -46,8 +48,37 @@ from deluge.plugins.pluginbase import GtkPluginBase import deluge.component as component import deluge.common +from twisted.internet import reactor, defer, threads +from twisted.web import server, resource + from common import get_resource +class LocalAddResource(resource.Resource): + gtkui = None + isLeaf = True + + def __init__(self, gtkui): + self.gtkui = gtkui + resource.Resource.__init__(self) + + def render_GET(self, request): + useragent = request.getHeader('User-Agent') + if 'Deluge-Streamer' not in useragent: + request.setResponseCode(401) + return 'Unauthorized' + + torrent_url = request.args.get('url', None) + if not torrent_url: + return json.dumps({'status': 'error', 'message': 'missing url in request'}) + + torrent_file = request.args.get('file', None) + if torrent_file: + torrent_file = torrent_file[0] + + client.streaming.stream_torrent(url=torrent_url[0], filepath_or_index=torrent_file).addCallback(self.gtkui.stream_ready) + + return json.dumps({'status': 'ok', 'message': 'queued'}) + class GtkUI(GtkPluginBase): def enable(self): self.glade = gtk.glade.XML(get_resource("config.glade")) @@ -67,7 +98,12 @@ class GtkUI(GtkPluginBase): self.sep.show() self.item.show() + + self.resource = LocalAddResource(self) + self.site = server.Site(self.resource) + self.listening = reactor.listenTCP(40747, self.site, interface='127.0.0.1') + @defer.inlineCallbacks def disable(self): component.get("Preferences").remove_page("Streaming") component.get("PluginManager").deregister_hook("on_apply_prefs", self.on_apply_prefs) @@ -77,12 +113,17 @@ class GtkUI(GtkPluginBase): file_menu.remove(self.item) file_menu.remove(self.sep) + + self.site.stopFactory() + yield self.listening.stopListening() def on_apply_prefs(self): log.debug("applying prefs for Streaming") config = { "ip": self.glade.get_widget("input_ip").get_text(), "port": int(self.glade.get_widget("input_port").get_text()), + "use_stream_urls": self.glade.get_widget("input_use_stream_urls").get_active(), + "auto_open_stream_urls": self.glade.get_widget("input_auto_open_stream_urls").get_active(), "allow_remote": self.glade.get_widget("input_allow_remote").get_active(), "reset_complete": self.glade.get_widget("input_reset_complete").get_active(), "remote_username": self.glade.get_widget("input_remote_username").get_text(), @@ -98,11 +139,26 @@ class GtkUI(GtkPluginBase): "callback for on show_prefs" self.glade.get_widget("input_ip").set_text(config["ip"]) self.glade.get_widget("input_port").set_text(str(config["port"])) + self.glade.get_widget("input_use_stream_urls").set_active(config["use_stream_urls"]) + self.glade.get_widget("input_auto_open_stream_urls").set_active(config["auto_open_stream_urls"]) self.glade.get_widget("input_allow_remote").set_active(config["allow_remote"]) self.glade.get_widget("input_reset_complete").set_active(config["reset_complete"]) self.glade.get_widget("input_remote_username").set_text(config["remote_username"]) self.glade.get_widget("input_remote_password").set_text(config["remote_password"]) + def stream_ready(self, result): + if result['status'] == 'success': + if result.get('use_stream_urls', False): + url = "stream+%s" % result['url'] + if result.get('auto_open_stream_urls', False): + threads.deferToThread(webbrowser.open, url) + else: + dialogs.InformationDialog('Stream ready', 'Click here to open it' % url).run() + else: + dialogs.ErrorDialog('Stream ready', 'Copy the link into a media player', details=result['url']).run() + else: + dialogs.ErrorDialog('Stream failed', 'Was unable to prepare the stream', details=result).run() + def on_menuitem_stream(self, data=None): torrent_id = component.get("TorrentView").get_selected_torrents()[0] @@ -113,13 +169,7 @@ class GtkUI(GtkPluginBase): for path in paths: selected.append(ft.treestore.get_iter(path)) - def stream_ready(result): - if result['status'] == 'success': - dialogs.ErrorDialog('Stream ready', 'Copy the link into a media player', details=result['url']).run() - else: - dialogs.ErrorDialog('Stream failed', 'Was unable to prepare the stream', details=result).run() - for select in selected: path = ft.get_file_path(select) - client.streaming.stream_torrent(torrent_id, path).addCallback(stream_ready) - break \ No newline at end of file + client.streaming.stream_torrent(infohash=torrent_id, filepath_or_index=path).addCallback(self.stream_ready) + break