mirror of
https://github.com/JohnDoee/deluge-streaming/
synced 2026-07-01 07:31:17 -07:00
restructured plugin
This commit is contained in:
@@ -40,25 +40,23 @@
|
||||
import base64
|
||||
import json
|
||||
import logging
|
||||
import math
|
||||
import os
|
||||
import time
|
||||
import urllib
|
||||
|
||||
import deluge.configmanager
|
||||
|
||||
from collections import defaultdict
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
from deluge import component
|
||||
from deluge._libtorrent import lt
|
||||
from deluge.core.rpcserver import export
|
||||
from deluge.plugins.pluginbase import CorePluginBase
|
||||
|
||||
from twisted.internet import reactor, defer, task
|
||||
from twisted.python import randbytes
|
||||
from twisted.web import server, resource, static, http
|
||||
from twisted.web.static import StaticProducer
|
||||
|
||||
import deluge.component as component
|
||||
import deluge.configmanager
|
||||
from deluge.core.rpcserver import export
|
||||
from deluge.log import LOG as log
|
||||
from deluge.plugins.pluginbase import CorePluginBase
|
||||
from twisted.web import server, resource, static, http, client
|
||||
|
||||
from .filelike import FilelikeObjectResource
|
||||
from .resource import Resource
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -68,22 +66,17 @@ DEFAULT_PREFS = {
|
||||
'port': 46123,
|
||||
'allow_remote': False,
|
||||
'reset_complete': True,
|
||||
'use_stream_urls': True,
|
||||
'auto_open_stream_urls': False,
|
||||
'remote_username': 'username',
|
||||
'remote_password': 'password',
|
||||
}
|
||||
|
||||
from .filelike import FilelikeObjectResource
|
||||
|
||||
MIN_QUEUE_CHUNKS = 6
|
||||
EXPECTED_PERCENT = 0.3
|
||||
EXPECTED_SIZE = 5*1024*1024
|
||||
HANDLERS_TIMEOUT = timedelta(hours=12)
|
||||
|
||||
class UnknownTorrentException(Exception):
|
||||
pass
|
||||
|
||||
class UnknownFileException(Exception):
|
||||
pass
|
||||
def sleep(seconds):
|
||||
d = defer.Deferred()
|
||||
reactor.callLater(seconds, d.callback, seconds)
|
||||
return d
|
||||
|
||||
class FileServeResource(resource.Resource):
|
||||
isLeaf = True
|
||||
@@ -106,35 +99,12 @@ class FileServeResource(resource.Resource):
|
||||
if key not in self.file_mapping:
|
||||
return resource.NoResource().render(request)
|
||||
|
||||
v = self.file_mapping[key]
|
||||
if isinstance(v, static.File):
|
||||
return v.render_GET(request)
|
||||
f = self.file_mapping[key]
|
||||
if f.is_complete():
|
||||
return static.File(f.full_path).render_GET(request)
|
||||
else:
|
||||
tf = v.copy()
|
||||
tf.open()
|
||||
return FilelikeObjectResource(tf, tf.size).render_GET(request)
|
||||
|
||||
class AddTorrentResource(Resource):
|
||||
isLeaf = True
|
||||
|
||||
def __init__(self, client, *args, **kwargs):
|
||||
self.client = client
|
||||
Resource.__init__(self, *args, **kwargs)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def render_POST(self, request):
|
||||
torrent_data = request.args.get('torrent_data', None)
|
||||
if not torrent_data:
|
||||
defer.returnValue(json.dumps({'status': 'error', 'message': 'missing torrent_data in request'}))
|
||||
|
||||
torrent_data = torrent_data[0].encode('base64')
|
||||
|
||||
torrent_id = yield self.client.add_torrent(torrent_data)
|
||||
|
||||
if torrent_id is None:
|
||||
defer.returnValue(json.dumps({'status': 'error', 'message': 'failed to add torrent'}))
|
||||
|
||||
defer.returnValue(json.dumps({'status': 'success', 'infohash': torrent_id, 'message': 'torrent added successfully'}))
|
||||
tfr = f.open()
|
||||
return FilelikeObjectResource(tfr, f.size).render_GET(request)
|
||||
|
||||
class StreamResource(Resource):
|
||||
isLeaf = True
|
||||
@@ -157,310 +127,392 @@ class StreamResource(Resource):
|
||||
result = yield self.client.stream_torrent(infohash[0], path[0])
|
||||
defer.returnValue(json.dumps(result))
|
||||
|
||||
class TorrentFile(object):
|
||||
file_handler = None
|
||||
def __init__(self, torrent_handler, file_path, torrent_file_path, size, chunk_size, offset, file_index):
|
||||
self.torrent_handler = torrent_handler
|
||||
self.file_path = self.path = file_path
|
||||
self.torrent_file_path = torrent_file_path
|
||||
self.first_chunk = offset / chunk_size
|
||||
self.last_chunk = (offset + size) / chunk_size
|
||||
self.chunk_size = chunk_size
|
||||
self.offset = offset
|
||||
self.file_index = file_index
|
||||
self.size = size
|
||||
self.last_requested_chunk = self.first_chunk
|
||||
self.is_closed = False
|
||||
self.is_active = False
|
||||
self.last_activity = datetime.now()
|
||||
self.end_chunks = []
|
||||
class UnknownTorrentException(Exception):
|
||||
pass
|
||||
|
||||
class UnknownFileException(Exception):
|
||||
pass
|
||||
|
||||
class TorrentFileReader(object):
|
||||
def __init__(self, torrent_file):
|
||||
self.torrent_file = torrent_file
|
||||
self.size = torrent_file.size
|
||||
self.position = 0
|
||||
|
||||
self.first_chunk_end = self.chunk_size * (self.first_chunk + 1) - offset
|
||||
|
||||
self.torrent_handler.torrent_files[torrent_file_path].append(self)
|
||||
|
||||
def open(self):
|
||||
if self.is_closed:
|
||||
raise IOError('Unable to reopen file')
|
||||
|
||||
self.file_handler = open(self.file_path, 'rb')
|
||||
|
||||
def get_chunk(self, tell):
|
||||
i = (tell + 1) - self.first_chunk_end
|
||||
if i <= 0:
|
||||
offset = 0
|
||||
else:
|
||||
offset = (i / self.chunk_size) + 1
|
||||
|
||||
return self.first_chunk + offset, self.first_chunk_end + (offset * self.chunk_size)
|
||||
|
||||
def prepare_torrent(self, buffer_pieces):
|
||||
self.torrent_handler.schedule_chunk(self.first_chunk, 0)
|
||||
self.torrent_handler.schedule_chunk(self.last_chunk, 0)
|
||||
|
||||
self.end_chunks.append(self.last_chunk)
|
||||
|
||||
if self.first_chunk != self.last_chunk:
|
||||
self.torrent_handler.schedule_chunk(self.last_chunk-1, 0)
|
||||
self.end_chunks.append(self.last_chunk-1)
|
||||
|
||||
for chunk, chunk_status in enumerate(self.torrent_handler.torrent.status.pieces[self.first_chunk:min(self.first_chunk+buffer_pieces, self.last_chunk)+1], self.first_chunk):
|
||||
self.torrent_handler.schedule_chunk(chunk, chunk-self.first_chunk)
|
||||
self.waiting_for_piece = None
|
||||
self.current_piece = None
|
||||
self.current_piece_data = None
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def read(self, size=1024):
|
||||
self.is_active = True
|
||||
self.last_activity = datetime.now()
|
||||
required_piece, read_position = self.torrent_file.get_piece_info(self.position)
|
||||
|
||||
tell = self.tell()
|
||||
chunk, end_of_chunk = self.get_chunk(tell)
|
||||
self.last_requested_chunk = chunk
|
||||
if self.current_piece != required_piece:
|
||||
logger.debug('We are missing piece %i and it is required, requesting' % (required_piece, ))
|
||||
self.waiting_for_piece = required_piece
|
||||
self.current_piece_data = yield self.torrent_file.get_piece_data(required_piece)
|
||||
self.current_piece = required_piece
|
||||
self.waiting_for_piece = None
|
||||
|
||||
logger.debug('waiting for chunk %s, %s, %s' % (chunk, size, tell))
|
||||
yield self.wait_chunk_complete(chunk)
|
||||
logger.debug('done waiting %s, %s, %s' % (chunk, size, tell))
|
||||
logger.debug('We can read from local piece from %s size %s from position %s' % (read_position, size, self.position))
|
||||
data = self.current_piece_data[read_position:read_position+size]
|
||||
self.position += len(data)
|
||||
|
||||
defer.returnValue(self.file_handler.read(min(end_of_chunk-tell, size)))
|
||||
|
||||
def wait_chunk_complete(self, chunk):
|
||||
d = defer.Deferred()
|
||||
|
||||
def check_if_done():
|
||||
if self.torrent_handler.torrent.status.pieces[chunk]:
|
||||
return d.callback(True)
|
||||
|
||||
self.torrent_handler.schedule_chunk(chunk, 0)
|
||||
|
||||
if self.is_closed:
|
||||
logger.debug('The file closed, shutting down torrent')
|
||||
return
|
||||
|
||||
reactor.callLater(1.0, check_if_done)
|
||||
|
||||
check_if_done()
|
||||
|
||||
return d
|
||||
|
||||
def seek(self, offset, whence=os.SEEK_SET):
|
||||
return self.file_handler.seek(offset, whence)
|
||||
defer.returnValue(data)
|
||||
|
||||
def tell(self):
|
||||
return self.file_handler.tell()
|
||||
return self.position
|
||||
|
||||
def close(self):
|
||||
if not self.is_closed and self in self.torrent_handler.torrent_files[self.torrent_file_path]:
|
||||
self.torrent_handler.torrent_files[self.torrent_file_path].remove(self)
|
||||
self.is_closed = True
|
||||
|
||||
if self.file_handler:
|
||||
return self.file_handler.close()
|
||||
self.torrent_file.close(self)
|
||||
|
||||
def copy(self):
|
||||
tf = TorrentFile(self.torrent_handler, self.file_path, self.torrent_file_path,
|
||||
self.size, self.chunk_size, self.offset, self.file_index)
|
||||
return tf
|
||||
def seek(self, offset, whence=os.SEEK_SET):
|
||||
self.position = offset
|
||||
|
||||
class TorrentHandler(object):
|
||||
def __init__(self, torrent, torrent_id, core):
|
||||
class TorrentFile(object): # can be read from, knows about itself
|
||||
def __init__(self, torrent, first_piece, last_piece, piece_size, offset, path, full_path, size, index):
|
||||
self.torrent = torrent
|
||||
self.torrent_handle = torrent.handle
|
||||
self.torrent_id = torrent_id
|
||||
self.core = core
|
||||
self.priorities = [0] * len(torrent.get_status(['files'])['files'])
|
||||
self.torrent_files = defaultdict(list)
|
||||
self.priorities_increased = {}
|
||||
# need to blackhole all pieces not downloaded yet
|
||||
self.last_activity = datetime.now()
|
||||
self.update_chunk_priorities()
|
||||
self.first_piece = first_piece
|
||||
self.last_piece = last_piece
|
||||
self.piece_size = piece_size
|
||||
self.offset = offset
|
||||
self.path = path
|
||||
self.size = size
|
||||
self.full_path = full_path
|
||||
self.index = index
|
||||
|
||||
self.file_requested = False
|
||||
self.do_shutdown = False
|
||||
self.first_piece_end = self.piece_size * (self.first_piece + 1) - offset
|
||||
self.waiting_pieces = {}
|
||||
self.current_readers = []
|
||||
self.registered_alert = False
|
||||
|
||||
self.alerts = component.get("AlertManager")
|
||||
|
||||
|
||||
def is_buffered(self, first_chunk, last_chunk):
|
||||
buffer_status = [x for x in self.torrent.status.pieces[first_chunk:last_chunk+1] if not x]
|
||||
logger.debug('Current buffer status for file (%s): %r' % (first_chunk, buffer_status))
|
||||
if buffer_status:
|
||||
return False
|
||||
else:
|
||||
return True
|
||||
def open(self):
|
||||
"""
|
||||
Returns a filelike object
|
||||
"""
|
||||
if not self.registered_alert:
|
||||
self.alerts.register_handler("read_piece_alert", self.on_alert_got_piece_data)
|
||||
self.registered_alert = True
|
||||
tfr = TorrentFileReader(self)
|
||||
self.current_readers.append(tfr)
|
||||
self.file_requested = False
|
||||
return tfr
|
||||
|
||||
def update_priorities(self):
|
||||
self.torrent.set_file_priorities(self.priorities)
|
||||
def close(self, tfr):
|
||||
self.current_readers.remove(tfr)
|
||||
self.torrent.unprioritize_pieces(tfr)
|
||||
|
||||
def schedule_chunk(self, chunk, distance):
|
||||
if chunk not in self.priorities_increased:
|
||||
logger.debug('Scheduled chunk %s at distance %s' % (chunk, distance))
|
||||
|
||||
self.torrent_handle.piece_priority(chunk, (7 if distance <= 4 else 6))
|
||||
self.torrent_handle.set_piece_deadline(chunk, 700*(distance+1))
|
||||
|
||||
self.priorities_increased[chunk] = True
|
||||
def is_complete(self):
|
||||
torrent_status = self.torrent.torrent.get_status(['file_progress', 'state'])
|
||||
file_progress = torrent_status['file_progress']
|
||||
return file_progress[self.index] == 1.0
|
||||
|
||||
def update_chunk_priority(self, tfs):
|
||||
handled_heads = []
|
||||
for tf in sorted(tfs, key=lambda x:x.last_requested_chunk, reverse=True):
|
||||
if datetime.now() - tf.last_activity > HANDLERS_TIMEOUT:
|
||||
logger.debug('Torrentfile timed out')
|
||||
tf.close()
|
||||
continue
|
||||
|
||||
if tf.last_requested_chunk is not None:
|
||||
if handled_heads:
|
||||
if tf.last_requested_chunk in handled_heads:
|
||||
continue
|
||||
|
||||
already_queued = False
|
||||
for i in sorted(handled_heads):
|
||||
if i > tf.last_requested_chunk:
|
||||
if 0 not in self.torrent.status.pieces[tf.last_requested_chunk:i]:
|
||||
already_queued = True
|
||||
break
|
||||
if already_queued:
|
||||
continue
|
||||
|
||||
offset = tf.last_requested_chunk + 1
|
||||
|
||||
current_buffer_offset = 5
|
||||
for chunk in tf.end_chunks:
|
||||
if not self.torrent.status.pieces[chunk]:
|
||||
logger.info('End chunks are not downloaded, setting buffer offset differently')
|
||||
current_buffer_offset = 20
|
||||
break
|
||||
|
||||
currently_downloading = set()
|
||||
for peer in self.torrent.handle.get_peer_info():
|
||||
if peer.downloading_piece_index != -1:
|
||||
currently_downloading.add(peer.downloading_piece_index)
|
||||
|
||||
status_increase_count = 0
|
||||
current_buffer = 0
|
||||
found_buffer_end = False
|
||||
for chunk, chunk_status in enumerate(self.torrent.status.pieces[offset:tf.last_chunk+1], offset):
|
||||
if not chunk_status and chunk not in currently_downloading:
|
||||
if not found_buffer_end:
|
||||
logger.debug('Found buffer end at %s, have a buffer of %s' % (chunk, current_buffer))
|
||||
found_buffer_end = True
|
||||
|
||||
if status_increase_count <= MIN_QUEUE_CHUNKS:
|
||||
self.schedule_chunk(chunk, chunk-offset)
|
||||
elif self.torrent_handle.piece_priority(chunk) == 0:
|
||||
self.torrent_handle.piece_priority(chunk, 1)
|
||||
|
||||
status_increase_count += 1
|
||||
elif not found_buffer_end and chunk not in currently_downloading:
|
||||
current_buffer += 1
|
||||
|
||||
if status_increase_count >= max(MIN_QUEUE_CHUNKS, current_buffer-current_buffer_offset):
|
||||
break
|
||||
|
||||
handled_heads.append(tf.last_requested_chunk)
|
||||
|
||||
return bool(handled_heads)
|
||||
|
||||
def update_chunk_priorities(self): # TODO: check if torrent still exists
|
||||
start_time = time.time()
|
||||
file_progress = self.torrent.get_status(['file_progress'])['file_progress']
|
||||
incomplete_files = False
|
||||
|
||||
for torrent_file_path, tfs in self.torrent_files.items():
|
||||
if not tfs:
|
||||
logger.debug('No heads left for %r' % torrent_file_path)
|
||||
del self.torrent_files[torrent_file_path]
|
||||
continue
|
||||
|
||||
tf = tfs[0]
|
||||
if file_progress[tf.file_index] == 1.0:
|
||||
logger.info('%s is already complete, skipping' % torrent_file_path)
|
||||
continue
|
||||
|
||||
if self.update_chunk_priority(tfs):
|
||||
self.last_activity = datetime.now()
|
||||
|
||||
incomplete_files = True
|
||||
|
||||
if not incomplete_files and self.core.config['reset_complete'] and not all(self.priorities):
|
||||
logger.info('We are not doing any file streamings, but not downloading all files, changing that.')
|
||||
self.priorities = [1] * len(self.priorities)
|
||||
self.update_priorities()
|
||||
|
||||
if datetime.now() - self.last_activity > HANDLERS_TIMEOUT:
|
||||
logger.debug('Torrent handler idle, killing myself.')
|
||||
if self.torrent_id in self.core.torrent_handlers:
|
||||
del self.core.torrent_handlers[self.torrent_id]
|
||||
def get_piece_info(self, tell):
|
||||
return divmod((self.offset + tell), self.piece_size)
|
||||
|
||||
def on_alert_got_piece_data(self, alert):
|
||||
torrent_id = str(alert.handle.info_hash())
|
||||
if torrent_id != self.torrent.infohash:
|
||||
return
|
||||
|
||||
logger.debug('Took %s seconds to handle a loop' % (time.time() - start_time))
|
||||
reactor.callLater(1, self.update_chunk_priorities)
|
||||
|
||||
def blackhole_all_pieces(self, first_chunk, last_chunk):
|
||||
for chunk in range(first_chunk, last_chunk+1):
|
||||
self.torrent_handle.piece_priority(chunk, 0)
|
||||
logger.debug('Got piece data for piece %s' % alert.piece)
|
||||
if alert.piece not in self.waiting_pieces:
|
||||
logger.debug('Got data for piece %i, but no data needed for this piece?' % alert.piece)
|
||||
return
|
||||
# TODO: check piece size is not zero
|
||||
self.waiting_pieces[alert.piece].callback(alert.buffer)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_file(self, filepath_or_index):
|
||||
status = self.torrent.get_status(['piece_length', 'files', 'file_priorities', 'file_progress', 'state', 'save_path'])
|
||||
pieces = self.torrent.status.pieces
|
||||
piece_length = status['piece_length']
|
||||
files = status['files']
|
||||
def get_piece_data(self, piece):
|
||||
logger.debug('Trying to get piece data for piece %s' % piece)
|
||||
for reader in self.current_readers:
|
||||
if reader.current_piece == piece:
|
||||
defer.returnValue(reader.current_piece_data)
|
||||
|
||||
for i, f, priority, progress in zip(range(len(files)), files, status['file_priorities'], status['file_progress']):
|
||||
if i == filepath_or_index or f['path'] == filepath_or_index:
|
||||
f['first_piece'] = f['offset'] / piece_length
|
||||
f['last_piece'] = (f['offset'] + f['size']) / piece_length
|
||||
f['pieces'] = pieces[f['first_piece']:f['last_piece']+1]
|
||||
f['priority'] = priority
|
||||
f['progress'] = progress
|
||||
|
||||
break
|
||||
else:
|
||||
raise UnknownFileException()
|
||||
if piece not in self.waiting_pieces:
|
||||
self.waiting_pieces[piece] = defer.Deferred()
|
||||
|
||||
fp = os.path.join(status['save_path'], f['path'])
|
||||
|
||||
if progress == 1: # file is complete, no need to fire up all the torrent jazz
|
||||
defer.returnValue(static.File(fp))
|
||||
|
||||
self.priorities = [0] * len(self.priorities)
|
||||
self.priorities[f['index']] = 3
|
||||
current_tfs = []
|
||||
for tfs in self.torrent_files.values():
|
||||
if not tfs:
|
||||
continue
|
||||
tf = tfs[0]
|
||||
self.priorities[tf.file_index] = 3
|
||||
current_tfs.append((tf, self.torrent_handle.piece_priorities()[tf.first_chunk:tf.last_chunk+1]))
|
||||
|
||||
self.update_priorities()
|
||||
|
||||
for tf, chunk_status in current_tfs:
|
||||
logger.info('Setting chunks to old status')
|
||||
for i, chunk in enumerate(chunk_status, tf.first_chunk):
|
||||
logger.debug('Setting status on chunk %s back to %s' % (i, chunk))
|
||||
self.torrent_handle.piece_priority(i, chunk)
|
||||
|
||||
self.torrent.resume()
|
||||
|
||||
percent_pieces = int(math.ceil((len(f['pieces']) / 100.0) * EXPECTED_PERCENT))
|
||||
size_pieces = int(min(math.ceil((EXPECTED_SIZE * 1.0) / piece_length), f['pieces']))
|
||||
expected_pieces = max(percent_pieces, size_pieces) # we need to download either 5% or 5MB of the file before allowing stream.
|
||||
|
||||
tf = TorrentFile(self, fp, f['path'], f['size'], status['piece_length'], f['offset'], f['index'])
|
||||
|
||||
if len(self.torrent_files[f['path']]) == 1:
|
||||
self.blackhole_all_pieces(tf.first_chunk, tf.last_chunk)
|
||||
|
||||
tf.prepare_torrent(expected_pieces)
|
||||
|
||||
for _ in range(300):
|
||||
if os.path.isfile(fp) and self.is_buffered(tf.first_chunk, tf.first_chunk+expected_pieces) and self.is_buffered(tf.last_chunk, tf.last_chunk):
|
||||
break
|
||||
|
||||
logger.debug('Waiting for %s' % piece)
|
||||
self.torrent.schedule_piece(self, piece, 0)
|
||||
while not self.torrent.torrent.handle.have_piece(piece):
|
||||
if self.do_shutdown:
|
||||
raise Exception()
|
||||
logger.debug('Did not have piece %i, waiting' % piece)
|
||||
self.torrent.unrelease()
|
||||
yield sleep(1)
|
||||
|
||||
defer.returnValue(tf)
|
||||
self.torrent.torrent.handle.read_piece(piece)
|
||||
|
||||
data = yield self.waiting_pieces[piece]
|
||||
if piece in self.waiting_pieces:
|
||||
del self.waiting_pieces[piece]
|
||||
logger.debug('Done waiting for piece %i, returning data' % piece)
|
||||
defer.returnValue(data)
|
||||
|
||||
def shutdown(self):
|
||||
#self.alerts.deregister_handler(self.on_alert_torrent_finished)
|
||||
self.do_shutdown = True
|
||||
|
||||
def sleep(seconds):
|
||||
d = defer.Deferred()
|
||||
reactor.callLater(seconds, d.callback, seconds)
|
||||
return d
|
||||
class Torrent(object):
|
||||
def __init__(self, torrent_handler, infohash):
|
||||
self.infohash = infohash
|
||||
self.torrent = component.get("TorrentManager").torrents.get(infohash, None)
|
||||
self.torrent_handler = torrent_handler
|
||||
|
||||
if not self.torrent:
|
||||
raise UnknownTorrentException('%s is not a known infohash' % infohash)
|
||||
|
||||
self.torrent_files = None
|
||||
self.priority_increased = defaultdict(set)
|
||||
self.do_shutdown = False
|
||||
self.torrent_released = False # set to True if all the files are set to download
|
||||
|
||||
|
||||
self.populate_files()
|
||||
self.file_priorities = [0] * len(self.torrent_files)
|
||||
|
||||
self.last_piece = self.torrent_files[-1].last_piece
|
||||
self.torrent.handle.set_sequential_download(True)
|
||||
reactor.callLater(0, self.update_piece_priority)
|
||||
reactor.callLater(0, self.blackhole_all_pieces, 0, self.last_piece)
|
||||
|
||||
def populate_files(self):
|
||||
self.torrent_files = []
|
||||
|
||||
status = self.torrent.get_status(['piece_length', 'files', 'save_path'])
|
||||
piece_length = status['piece_length']
|
||||
files = status['files']
|
||||
save_path = status['save_path']
|
||||
|
||||
for f in files:
|
||||
first_piece = f['offset'] / piece_length
|
||||
last_piece = (f['offset'] + f['size']) / piece_length
|
||||
full_path = os.path.join(save_path, f['path'])
|
||||
self.torrent_files.append(TorrentFile(self, first_piece, last_piece, piece_length, f['offset'],
|
||||
f['path'], full_path, f['size'], f['index']))
|
||||
|
||||
return files
|
||||
|
||||
def find_file(self, file_or_index=None):
|
||||
best_file = None
|
||||
biggest_file_size = 0
|
||||
|
||||
for i, f in enumerate(self.torrent_files):
|
||||
if file_or_index is not None:
|
||||
if i == file_or_index or f.path == file_or_index:
|
||||
best_file = f
|
||||
break
|
||||
else:
|
||||
if f.size > biggest_file_size:
|
||||
best_file = f
|
||||
|
||||
return best_file
|
||||
|
||||
def get_file(self, file_or_index=None):
|
||||
f = self.find_file(file_or_index)
|
||||
if f is None:
|
||||
raise UnknownFileException('Was unable to find %s' % file_or_index)
|
||||
|
||||
return f
|
||||
|
||||
def unprioritize_pieces(self, tfr):
|
||||
logger.debug('Unprioritizing pieces for %s' % tfr)
|
||||
currently_downloading = self.get_currently_downloading()
|
||||
|
||||
for piece, increased_by in self.priority_increased.items():
|
||||
if tfr in increased_by:
|
||||
increased_by.remove(tfr)
|
||||
if not increased_by and piece not in currently_downloading and not self.torrent.status.pieces[piece]:
|
||||
logger.debug('Unprioritizing piece %s' % piece)
|
||||
self.torrent.handle.piece_priority(piece, 0)
|
||||
|
||||
def get_currently_downloading(self):
|
||||
currently_downloading = set()
|
||||
for peer in self.torrent.handle.get_peer_info():
|
||||
if peer.downloading_piece_index != -1:
|
||||
currently_downloading.add(peer.downloading_piece_index)
|
||||
|
||||
return currently_downloading
|
||||
|
||||
def blackhole_all_pieces(self, first_piece, last_piece):
|
||||
currently_downloading = self.get_currently_downloading()
|
||||
|
||||
logger.debug('Blacklisting pieces from %i to %i skipping %r' % (first_piece, last_piece, currently_downloading))
|
||||
for piece in range(first_piece, last_piece+1):
|
||||
if piece not in currently_downloading and not self.torrent.status.pieces[piece]:
|
||||
if piece in self.priority_increased:
|
||||
continue
|
||||
logger.debug('Setting piece priority %s to blacklist' % piece)
|
||||
self.torrent.handle.piece_priority(piece, 0)
|
||||
|
||||
def unrelease(self):
|
||||
if self.torrent_released:
|
||||
logger.debug('Unreleasing %s' % self.infohash)
|
||||
self.torrent_released = False
|
||||
self.torrent.set_file_priorities(self.file_priorities)
|
||||
self.blackhole_all_pieces(0, self.last_piece)
|
||||
|
||||
def get_torrent_file(self, file_or_index):
|
||||
f = self.get_file(file_or_index)
|
||||
f.file_requested = True
|
||||
|
||||
self.torrent.resume()
|
||||
|
||||
should_update_priorities = False
|
||||
if self.file_priorities[f.index] == 0:
|
||||
self.file_priorities[f.index] = 3
|
||||
should_update_priorities = True
|
||||
|
||||
if self.torrent_released:
|
||||
should_update_priorities = True
|
||||
|
||||
if should_update_priorities and not f.is_complete(): # Need to do this stuff on seek too
|
||||
self.unrelease()
|
||||
|
||||
return f
|
||||
|
||||
def shutdown(self):
|
||||
logger.info('Shutting down torrent %s' % self.infohash)
|
||||
self.do_shutdown = True
|
||||
|
||||
for tf in self.torrent_files:
|
||||
tf.shutdown()
|
||||
|
||||
def schedule_piece(self, torrent_file, piece, distance):
|
||||
if torrent_file not in self.priority_increased[piece]:
|
||||
if not self.priority_increased[piece]:
|
||||
self.priority_increased[piece].add(torrent_file)
|
||||
|
||||
logger.debug('Scheduled piece %s at distance %s' % (piece, distance))
|
||||
|
||||
self.torrent.handle.piece_priority(piece, (7 if distance <= 4 else 6))
|
||||
self.torrent.handle.set_piece_deadline(piece, 700*(distance+1))
|
||||
|
||||
self.priority_increased[piece].add(torrent_file)
|
||||
|
||||
def do_pieces_schedule(self, torrent_file, currently_downloading, from_piece):
|
||||
logger.debug('Looking for stuff to do with pieces for file %s from piece %s' % (torrent_file, from_piece))
|
||||
|
||||
priority_increased = 0
|
||||
chain_size = 0
|
||||
download_chain_size = 0
|
||||
end_of_chain = False
|
||||
|
||||
current_buffer_offset = 5
|
||||
if self.torrent.status.pieces[torrent_file.last_piece]:
|
||||
if torrent_file.first_piece != torrent_file.last_piece and self.torrent.status.pieces[torrent_file.last_piece-1] \
|
||||
or torrent_file.first_piece == torrent_file.last_piece:
|
||||
current_buffer_offset = 20
|
||||
|
||||
for piece, status in enumerate(self.torrent.status.pieces[from_piece:torrent_file.last_piece+1], from_piece):
|
||||
if not end_of_chain:
|
||||
if status:
|
||||
chain_size += 1
|
||||
elif piece in currently_downloading:
|
||||
download_chain_size += 1
|
||||
|
||||
if not status and piece not in currently_downloading:
|
||||
if not end_of_chain:
|
||||
status_increase = max(11, chain_size-current_buffer_offset)
|
||||
end_of_chain = True
|
||||
|
||||
priority_increased += 1
|
||||
if priority_increased >= status_increase:
|
||||
logger.debug('Done increasing priority for %i pieces' % status_increase)
|
||||
break
|
||||
|
||||
self.schedule_piece(torrent_file, piece, piece-from_piece)
|
||||
else:
|
||||
logger.info('We are done with the rest of this chain, we might be able to increase others')
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
def update_piece_priority(self): # if all do_pieces_schedule returns true, allow all pices of file to be downloaded or whole torernt
|
||||
if self.do_shutdown:
|
||||
return
|
||||
|
||||
logger.debug('Updating piece priority for %s' % self.infohash)
|
||||
|
||||
currently_downloading = set()
|
||||
for peer in self.torrent.handle.get_peer_info():
|
||||
if peer.downloading_piece_index != -1:
|
||||
currently_downloading.add(peer.downloading_piece_index)
|
||||
|
||||
all_heads_done = True
|
||||
for f in self.torrent_files:
|
||||
if not f.file_requested and not f.current_readers:
|
||||
continue
|
||||
|
||||
logger.debug('Rescheduling file %s' % f.path)
|
||||
|
||||
if f.file_requested:
|
||||
all_heads_done &= self.do_pieces_schedule(f, currently_downloading, f.first_piece)
|
||||
self.schedule_piece(f, f.last_piece, 0)
|
||||
if f.first_piece != f.last_piece:
|
||||
self.schedule_piece(f, f.last_piece-1, 1)
|
||||
|
||||
for tfr in f.current_readers:
|
||||
if tfr.waiting_for_piece is not None:
|
||||
logger.debug('Scheduling based on waiting for piece %s' % tfr.waiting_for_piece)
|
||||
all_heads_done &= self.do_pieces_schedule(f, currently_downloading, tfr.waiting_for_piece)
|
||||
elif tfr.current_piece is not None:
|
||||
logger.debug('Scheduling based on current piece %s' % tfr.current_piece)
|
||||
all_heads_done &= self.do_pieces_schedule(f, currently_downloading, tfr.current_piece)
|
||||
|
||||
if all_heads_done and not self.torrent_released:
|
||||
logger.debug('We are already done with all heads, figuring out what to do next')
|
||||
|
||||
if self.torrent_handler.config['reset_complete']:
|
||||
self.torrent_released = True
|
||||
logger.debug('Resetting all disabled files')
|
||||
file_priorities = [(1 if fp == 0 else fp) for fp in self.file_priorities]
|
||||
self.torrent.set_file_priorities(file_priorities)
|
||||
|
||||
if not all_heads_done and self.torrent_released:
|
||||
logger.debug('Seems like the torrent was released too early')
|
||||
self.unrelease()
|
||||
|
||||
reactor.callLater(0.3, self.update_piece_priority)
|
||||
|
||||
class TorrentHandler(object):
|
||||
def __init__(self, config):
|
||||
self.torrents = {}
|
||||
self.config = config
|
||||
|
||||
self.alerts = component.get("AlertManager")
|
||||
self.alerts.register_handler("torrent_removed_alert", self.on_alert_torrent_removed)
|
||||
|
||||
def get_stream(self, infohash, file_or_index=None):
|
||||
logger.info('Trying to stream infohash %s and file %s' % (infohash, file_or_index))
|
||||
if infohash not in self.torrents:
|
||||
self.torrents[infohash] = Torrent(self, infohash)
|
||||
|
||||
return self.torrents[infohash].get_torrent_file(file_or_index)
|
||||
|
||||
def on_alert_torrent_removed(self, alert):
|
||||
try:
|
||||
torrent_id = str(alert.handle.info_hash())
|
||||
except (RuntimeError, KeyError):
|
||||
logger.warning('Failed to handle on torrent remove alert')
|
||||
return
|
||||
|
||||
if torrent_id not in self.torrents:
|
||||
return
|
||||
|
||||
self.torrents[torrent_id].shutdown()
|
||||
del self.torrents[torrent_id]
|
||||
|
||||
def shutdown(self):
|
||||
logger.debug('Shutting down TorrentHandler')
|
||||
self.alerts.deregister_handler(self.on_alert_torrent_removed)
|
||||
for torrent in self.torrents.values():
|
||||
torrent.shutdown()
|
||||
|
||||
class Core(CorePluginBase):
|
||||
def enable(self):
|
||||
@@ -470,9 +522,6 @@ class Core(CorePluginBase):
|
||||
self.resource = Resource()
|
||||
self.resource.putChild('file', self.fsr)
|
||||
if self.config['allow_remote']:
|
||||
self.resource.putChild('add_torrent', AddTorrentResource(username=self.config['remote_username'],
|
||||
password=self.config['remote_password'],
|
||||
client=self))
|
||||
self.resource.putChild('stream', StreamResource(username=self.config['remote_username'],
|
||||
password=self.config['remote_password'],
|
||||
client=self))
|
||||
@@ -487,7 +536,7 @@ class Core(CorePluginBase):
|
||||
except AttributeError:
|
||||
logger.warning('Unable to exclude partial pieces')
|
||||
|
||||
self.torrent_handlers = {}
|
||||
self.torrent_handler = TorrentHandler(self.config)
|
||||
|
||||
try:
|
||||
self.listening = reactor.listenTCP(self.config['port'], self.site, interface=self.config['ip'])
|
||||
@@ -497,6 +546,7 @@ class Core(CorePluginBase):
|
||||
@defer.inlineCallbacks
|
||||
def disable(self):
|
||||
self.site.stopFactory()
|
||||
self.torrent_handler.shutdown()
|
||||
yield self.listening.stopListening()
|
||||
|
||||
def update(self):
|
||||
@@ -506,7 +556,6 @@ class Core(CorePluginBase):
|
||||
@defer.inlineCallbacks
|
||||
def set_config(self, config):
|
||||
"""Sets the config dictionary"""
|
||||
do_reload = False
|
||||
for key in config.keys():
|
||||
self.config[key] = config[key]
|
||||
self.config.save()
|
||||
@@ -521,43 +570,36 @@ class Core(CorePluginBase):
|
||||
|
||||
@export
|
||||
@defer.inlineCallbacks
|
||||
def add_torrent(self, torrent_data):
|
||||
core = component.get("Core")
|
||||
tid = yield core.add_torrent_file('file.torrent', torrent_data, {'add_paused': True})
|
||||
def stream_torrent(self, infohash=None, url=None, filedump=None, filepath_or_index=None):
|
||||
tor = component.get("TorrentManager").torrents.get(infohash, None)
|
||||
|
||||
tor = component.get("TorrentManager").torrents.get(tid, None)
|
||||
|
||||
state = tor.get_status(['files'])
|
||||
tor.set_file_priorities([0] * len(state['files']))
|
||||
|
||||
defer.returnValue(tid)
|
||||
|
||||
def get_torrent_handler(self, tid):
|
||||
if tid not in self.torrent_handlers:
|
||||
tor = component.get("TorrentManager").torrents.get(tid, None)
|
||||
if tor is None:
|
||||
logger.info('Did not find torrent, must add it')
|
||||
|
||||
if tor is None:
|
||||
raise UnknownTorrentException()
|
||||
if not filedump and url:
|
||||
filedump = yield client.getPage(url)
|
||||
|
||||
self.torrent_handlers[tid] = TorrentHandler(tor, tid, self)
|
||||
if not filedump:
|
||||
defer.returnValue({'status': 'error', 'message': 'unable to find torrent, provide infohash, url or filedump'})
|
||||
|
||||
torrent_info = lt.torrent_info(lt.bdecode(filedump))
|
||||
infohash = str(torrent_info.info_hash())
|
||||
|
||||
return self.torrent_handlers[tid]
|
||||
|
||||
@export
|
||||
@defer.inlineCallbacks
|
||||
def stream_torrent(self, tid, filepath_or_index):
|
||||
core = component.get("Core")
|
||||
try:
|
||||
yield core.add_torrent_file('file.torrent', filedump.encode('base64'), {'add_paused': True})
|
||||
except:
|
||||
defer.returnValue({'status': 'error', 'message': 'failed to add torrent'})
|
||||
|
||||
try:
|
||||
torrent_handler = yield self.get_torrent_handler(tid)
|
||||
except UnknownTorrentException: # torrent isn't added yet
|
||||
defer.returnValue({'status': 'error', 'message': 'torrent_not_found'})
|
||||
|
||||
try:
|
||||
tf = yield torrent_handler.get_file(filepath_or_index)
|
||||
except UnknownFileException:
|
||||
defer.returnValue({'status': 'error', 'message': 'file_not_found'})
|
||||
tf = self.torrent_handler.get_stream(infohash, filepath_or_index)
|
||||
except UnknownTorrentException:
|
||||
defer.returnValue({'status': 'error', 'message': 'unable to find torrent, probably failed to add it'})
|
||||
|
||||
defer.returnValue({
|
||||
'status': 'success',
|
||||
'use_stream_urls': self.config['use_stream_urls'],
|
||||
'auto_open_stream_urls': self.config['auto_open_stream_urls'],
|
||||
'url': 'http://%s:%s/file/%s/%s' % (self.config.config['ip'], self.config.config['port'],
|
||||
self.fsr.add_file(tf), urllib.quote_plus(os.path.basename(tf.path)))
|
||||
self.fsr.add_file(tf), urllib.quote_plus(os.path.basename(tf.path)))
|
||||
})
|
||||
@@ -33,6 +33,28 @@
|
||||
</packing>
|
||||
</child>
|
||||
|
||||
<child>
|
||||
<widget class="GtkLabel" id="label_use_stream_urls">
|
||||
<property name="visible">True</property>
|
||||
<property name="xalign">0</property>
|
||||
<property name="label" translatable="yes">Use stream urls:</property>
|
||||
</widget>
|
||||
<packing>
|
||||
<property name="position">2</property>
|
||||
</packing>
|
||||
</child>
|
||||
|
||||
<child>
|
||||
<widget class="GtkLabel" id="label_auto_open_stream_urls">
|
||||
<property name="visible">True</property>
|
||||
<property name="xalign">0</property>
|
||||
<property name="label" translatable="yes">Auto-open stream urls:</property>
|
||||
</widget>
|
||||
<packing>
|
||||
<property name="position">3</property>
|
||||
</packing>
|
||||
</child>
|
||||
|
||||
<child>
|
||||
<widget class="GtkLabel" id="label_reset_complete">
|
||||
<property name="visible">True</property>
|
||||
@@ -40,7 +62,7 @@
|
||||
<property name="label" translatable="yes">Reset "do not download" when streamed file is complete:</property>
|
||||
</widget>
|
||||
<packing>
|
||||
<property name="position">2</property>
|
||||
<property name="position">4</property>
|
||||
</packing>
|
||||
</child>
|
||||
|
||||
@@ -51,7 +73,7 @@
|
||||
<property name="label" translatable="yes">Allow remote control:</property>
|
||||
</widget>
|
||||
<packing>
|
||||
<property name="position">3</property>
|
||||
<property name="position">5</property>
|
||||
</packing>
|
||||
</child>
|
||||
|
||||
@@ -62,7 +84,7 @@
|
||||
<property name="label" translatable="yes">Remote username:</property>
|
||||
</widget>
|
||||
<packing>
|
||||
<property name="position">4</property>
|
||||
<property name="position">6</property>
|
||||
</packing>
|
||||
</child>
|
||||
|
||||
@@ -73,7 +95,7 @@
|
||||
<property name="label" translatable="yes">Remote password:</property>
|
||||
</widget>
|
||||
<packing>
|
||||
<property name="position">5</property>
|
||||
<property name="position">7</property>
|
||||
</packing>
|
||||
</child>
|
||||
|
||||
@@ -108,7 +130,7 @@
|
||||
</child>
|
||||
|
||||
<child>
|
||||
<widget class="GtkCheckButton" id="input_reset_complete">
|
||||
<widget class="GtkCheckButton" id="input_use_stream_urls">
|
||||
<property name="visible">True</property>
|
||||
<property name="can_focus">True</property>
|
||||
</widget>
|
||||
@@ -118,7 +140,7 @@
|
||||
</child>
|
||||
|
||||
<child>
|
||||
<widget class="GtkCheckButton" id="input_allow_remote">
|
||||
<widget class="GtkCheckButton" id="input_auto_open_stream_urls">
|
||||
<property name="visible">True</property>
|
||||
<property name="can_focus">True</property>
|
||||
</widget>
|
||||
@@ -128,7 +150,7 @@
|
||||
</child>
|
||||
|
||||
<child>
|
||||
<widget class="GtkEntry" id="input_remote_username">
|
||||
<widget class="GtkCheckButton" id="input_reset_complete">
|
||||
<property name="visible">True</property>
|
||||
<property name="can_focus">True</property>
|
||||
</widget>
|
||||
@@ -137,6 +159,26 @@
|
||||
</packing>
|
||||
</child>
|
||||
|
||||
<child>
|
||||
<widget class="GtkCheckButton" id="input_allow_remote">
|
||||
<property name="visible">True</property>
|
||||
<property name="can_focus">True</property>
|
||||
</widget>
|
||||
<packing>
|
||||
<property name="position">5</property>
|
||||
</packing>
|
||||
</child>
|
||||
|
||||
<child>
|
||||
<widget class="GtkEntry" id="input_remote_username">
|
||||
<property name="visible">True</property>
|
||||
<property name="can_focus">True</property>
|
||||
</widget>
|
||||
<packing>
|
||||
<property name="position">6</property>
|
||||
</packing>
|
||||
</child>
|
||||
|
||||
<child>
|
||||
<widget class="GtkEntry" id="input_remote_password">
|
||||
<property name="visible">True</property>
|
||||
@@ -144,13 +186,13 @@
|
||||
<property name="can_focus">True</property>
|
||||
</widget>
|
||||
<packing>
|
||||
<property name="position">5</property>
|
||||
<property name="position">7</property>
|
||||
</packing>
|
||||
</child>
|
||||
|
||||
</widget>
|
||||
<packing>
|
||||
<property name="position">1</property>
|
||||
<property name="position">7</property>
|
||||
</packing>
|
||||
</child>
|
||||
</widget>
|
||||
|
||||
@@ -68,6 +68,18 @@ PreferencePage = Ext.extend(Ext.Panel, {
|
||||
fieldLabel: 'IP'
|
||||
}));
|
||||
|
||||
om.bind('use_stream_urls', fieldset.add({
|
||||
xtype: 'checkbox',
|
||||
name: 'use_stream_urls',
|
||||
fieldLabel: 'Use StreamProtocol urls',
|
||||
}));
|
||||
|
||||
om.bind('auto_open_stream_urls', fieldset.add({
|
||||
xtype: 'checkbox',
|
||||
name: 'auto_open_stream_urls',
|
||||
fieldLabel: 'AutoOpen StreamProtocol urls',
|
||||
}));
|
||||
|
||||
om.bind('reset_complete', fieldset.add({
|
||||
xtype: 'checkbox',
|
||||
name: 'reset_complete',
|
||||
@@ -150,10 +162,19 @@ StreamingPlugin = Ext.extend(Deluge.Plugin, {
|
||||
var fileIndex = nodes[0].attributes.fileIndex;
|
||||
var tid = files.torrentId;
|
||||
if (fileIndex >= 0) {
|
||||
deluge.client.streaming.stream_torrent(tid, fileIndex, {
|
||||
deluge.client.streaming.stream_torrent(tid, null, null, fileIndex, {
|
||||
success: function (result) {
|
||||
console.log('Got result', result);
|
||||
if (result.status == 'success') {
|
||||
Ext.Msg.alert('Stream ready', 'URL for stream: <a target="_blank" href="' + result.url + '">' + result.url + '</a>');
|
||||
var url = result.url;
|
||||
if (result.use_stream_urls) {
|
||||
url = 'stream+' + url;
|
||||
if (result.auto_open_stream_urls) {
|
||||
window.location.assign(url);
|
||||
return;
|
||||
}
|
||||
}
|
||||
Ext.Msg.alert('Stream ready', 'URL for stream: <a target="_blank" href="' + url + '">' + url + '</a>');
|
||||
} else {
|
||||
Ext.Msg.alert('Stream failed', 'Error message: ' + result.message);
|
||||
}
|
||||
|
||||
@@ -68,6 +68,7 @@ class MultipleRangeStaticProducer(static.MultipleRangeStaticProducer):
|
||||
if done:
|
||||
self.request.unregisterProducer()
|
||||
self.request.finish()
|
||||
self.stopProducing()
|
||||
self.request = None
|
||||
|
||||
class FilelikeObjectResource(static.File):
|
||||
@@ -141,6 +142,7 @@ class FilelikeObjectResource(static.File):
|
||||
producer = self.makeProducer(request, self.fileObject)
|
||||
|
||||
if request.method == 'HEAD':
|
||||
self.fileObject.close()
|
||||
return ''
|
||||
|
||||
producer.start()
|
||||
|
||||
@@ -37,7 +37,9 @@
|
||||
# statement from all source files in the program, then also delete it here.
|
||||
#
|
||||
|
||||
import json
|
||||
import gtk
|
||||
import webbrowser
|
||||
|
||||
from deluge.log import LOG as log
|
||||
from deluge.ui.client import client
|
||||
@@ -46,8 +48,37 @@ from deluge.plugins.pluginbase import GtkPluginBase
|
||||
import deluge.component as component
|
||||
import deluge.common
|
||||
|
||||
from twisted.internet import reactor, defer, threads
|
||||
from twisted.web import server, resource
|
||||
|
||||
from common import get_resource
|
||||
|
||||
class LocalAddResource(resource.Resource):
|
||||
gtkui = None
|
||||
isLeaf = True
|
||||
|
||||
def __init__(self, gtkui):
|
||||
self.gtkui = gtkui
|
||||
resource.Resource.__init__(self)
|
||||
|
||||
def render_GET(self, request):
|
||||
useragent = request.getHeader('User-Agent')
|
||||
if 'Deluge-Streamer' not in useragent:
|
||||
request.setResponseCode(401)
|
||||
return 'Unauthorized'
|
||||
|
||||
torrent_url = request.args.get('url', None)
|
||||
if not torrent_url:
|
||||
return json.dumps({'status': 'error', 'message': 'missing url in request'})
|
||||
|
||||
torrent_file = request.args.get('file', None)
|
||||
if torrent_file:
|
||||
torrent_file = torrent_file[0]
|
||||
|
||||
client.streaming.stream_torrent(url=torrent_url[0], filepath_or_index=torrent_file).addCallback(self.gtkui.stream_ready)
|
||||
|
||||
return json.dumps({'status': 'ok', 'message': 'queued'})
|
||||
|
||||
class GtkUI(GtkPluginBase):
|
||||
def enable(self):
|
||||
self.glade = gtk.glade.XML(get_resource("config.glade"))
|
||||
@@ -67,7 +98,12 @@ class GtkUI(GtkPluginBase):
|
||||
|
||||
self.sep.show()
|
||||
self.item.show()
|
||||
|
||||
self.resource = LocalAddResource(self)
|
||||
self.site = server.Site(self.resource)
|
||||
self.listening = reactor.listenTCP(40747, self.site, interface='127.0.0.1')
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def disable(self):
|
||||
component.get("Preferences").remove_page("Streaming")
|
||||
component.get("PluginManager").deregister_hook("on_apply_prefs", self.on_apply_prefs)
|
||||
@@ -77,12 +113,17 @@ class GtkUI(GtkPluginBase):
|
||||
|
||||
file_menu.remove(self.item)
|
||||
file_menu.remove(self.sep)
|
||||
|
||||
self.site.stopFactory()
|
||||
yield self.listening.stopListening()
|
||||
|
||||
def on_apply_prefs(self):
|
||||
log.debug("applying prefs for Streaming")
|
||||
config = {
|
||||
"ip": self.glade.get_widget("input_ip").get_text(),
|
||||
"port": int(self.glade.get_widget("input_port").get_text()),
|
||||
"use_stream_urls": self.glade.get_widget("input_use_stream_urls").get_active(),
|
||||
"auto_open_stream_urls": self.glade.get_widget("input_auto_open_stream_urls").get_active(),
|
||||
"allow_remote": self.glade.get_widget("input_allow_remote").get_active(),
|
||||
"reset_complete": self.glade.get_widget("input_reset_complete").get_active(),
|
||||
"remote_username": self.glade.get_widget("input_remote_username").get_text(),
|
||||
@@ -98,11 +139,26 @@ class GtkUI(GtkPluginBase):
|
||||
"callback for on show_prefs"
|
||||
self.glade.get_widget("input_ip").set_text(config["ip"])
|
||||
self.glade.get_widget("input_port").set_text(str(config["port"]))
|
||||
self.glade.get_widget("input_use_stream_urls").set_active(config["use_stream_urls"])
|
||||
self.glade.get_widget("input_auto_open_stream_urls").set_active(config["auto_open_stream_urls"])
|
||||
self.glade.get_widget("input_allow_remote").set_active(config["allow_remote"])
|
||||
self.glade.get_widget("input_reset_complete").set_active(config["reset_complete"])
|
||||
self.glade.get_widget("input_remote_username").set_text(config["remote_username"])
|
||||
self.glade.get_widget("input_remote_password").set_text(config["remote_password"])
|
||||
|
||||
def stream_ready(self, result):
|
||||
if result['status'] == 'success':
|
||||
if result.get('use_stream_urls', False):
|
||||
url = "stream+%s" % result['url']
|
||||
if result.get('auto_open_stream_urls', False):
|
||||
threads.deferToThread(webbrowser.open, url)
|
||||
else:
|
||||
dialogs.InformationDialog('Stream ready', '<a href="%s">Click here to open it</a>' % url).run()
|
||||
else:
|
||||
dialogs.ErrorDialog('Stream ready', 'Copy the link into a media player', details=result['url']).run()
|
||||
else:
|
||||
dialogs.ErrorDialog('Stream failed', 'Was unable to prepare the stream', details=result).run()
|
||||
|
||||
def on_menuitem_stream(self, data=None):
|
||||
torrent_id = component.get("TorrentView").get_selected_torrents()[0]
|
||||
|
||||
@@ -113,13 +169,7 @@ class GtkUI(GtkPluginBase):
|
||||
for path in paths:
|
||||
selected.append(ft.treestore.get_iter(path))
|
||||
|
||||
def stream_ready(result):
|
||||
if result['status'] == 'success':
|
||||
dialogs.ErrorDialog('Stream ready', 'Copy the link into a media player', details=result['url']).run()
|
||||
else:
|
||||
dialogs.ErrorDialog('Stream failed', 'Was unable to prepare the stream', details=result).run()
|
||||
|
||||
for select in selected:
|
||||
path = ft.get_file_path(select)
|
||||
client.streaming.stream_torrent(torrent_id, path).addCallback(stream_ready)
|
||||
break
|
||||
client.streaming.stream_torrent(infohash=torrent_id, filepath_or_index=path).addCallback(self.stream_ready)
|
||||
break
|
||||
|
||||
Reference in New Issue
Block a user