22 Commits
0.3.1 ... 0.6.0

Author SHA1 Message Date
JohnDoee
0bd7b8d40d Merge branch 'release/0.6.0' 2016-06-24 20:42:46 +02:00
JohnDoee
6b2817ce5e bumped version and updated changelog 2016-06-24 20:42:39 +02:00
JohnDoee
0cc8ed4280 improved algorithm a little, changed default to not use streaming url 2016-06-24 20:31:21 +02:00
JohnDoee
636bd0edc2 fixed small bug with file releasing 2015-12-08 19:35:41 +01:00
JohnDoee
6bc72dbc44 Problem with filename encoding 2015-11-22 16:21:10 +01:00
JohnDoee
c85c5763d6 Listening to 0.0.0.0 instead of 127.0.0.1 when attempted address is not local 2015-11-22 16:20:58 +01:00
JohnDoee
bcc2067e55 Proper handling of name / not name in requested file 2015-11-22 16:20:37 +01:00
JohnDoee
1d31efa48c fixed small bug with unloading 2015-11-21 20:29:42 +01:00
JohnDoee
5859a19e0e Merge tag '0.5.0' into develop
Partial rewrite
2015-11-21 19:25:59 +01:00
JohnDoee
4290a0b821 Merge branch 'release/0.5.0' 2015-11-21 19:25:54 +01:00
JohnDoee
634219f0f8 added support for direct torrent streaming 2015-11-21 19:05:01 +01:00
JohnDoee
342b9f77a4 fixed bug related to path streaming 2015-11-18 11:40:11 +01:00
JohnDoee
8f03e719fa restructured plugin 2015-11-17 22:41:14 +01:00
JohnDoee
8c010abe16 Merge tag '0.4.1' into develop
Fixed old Deluge bug
2015-07-19 21:37:53 +02:00
JohnDoee
e92f9029a4 Merge branch 'release/0.4.1' 2015-07-19 21:37:44 +02:00
JohnDoee
a483bfe599 updated readme and bumped version 2015-07-19 21:37:34 +02:00
JohnDoee
f3ee7f4270 fixed bug with old deluge version 2015-07-19 21:34:01 +02:00
JohnDoee
91e8da6dfd Merge tag '0.4.0' into develop
WebUI support
2015-07-19 19:21:23 +02:00
JohnDoee
2e4b166528 Merge branch 'release/0.4.0' 2015-07-19 19:21:19 +02:00
JohnDoee
aaf70bbfc8 Updated Readme and bumped version 2015-07-19 19:21:09 +02:00
JohnDoee
52661abf52 Tried to improve scheduling algorithm and added WebUI support as requested in #2 2015-07-19 19:18:59 +02:00
JohnDoee
a9dae5bc90 Added download instructions to readme 2015-06-18 21:23:54 +02:00
8 changed files with 780 additions and 396 deletions

View File

@@ -11,6 +11,11 @@ the user to stream a file using HTTP.
Technically, it tries to download the part of a file the user requests and
downloads ahead, this enables seeking in video files.
## Where to download
You can download this release on Github. Look for the "releases" tab on the repository page.
Under that tab, eggs for Python 2.6 and 2.7 should exist.
## How to use
* Install plugin
@@ -32,12 +37,26 @@ The _allow remote_ option is to allow remote add and stream of torrents.
## ToDo
* Add support for the WebUI
* There are a few situations where an uncaught exception is thrown.
* Add feedback when preparing stream.
# Version Info
## Version 0.6.0
* Fixed URL encoding error
* Fixed "resume on complete" broken-ness (i hope)
* Changed default to not use stream urls
## Version 0.5.0
* Restructured the whole plugin
* Added support for StreamProtocol
## Version 0.4.1
* Fixed bug with old Deluge versions
## Version 0.4.0
* Added WebUI support
* Improved scheduling algorithm
## Version 0.3
* Fixed bug when streaming multiple files.
* Changed to try to prioritize end pieces more aggressively to not leave them hanging.
@@ -48,4 +67,4 @@ The _allow remote_ option is to allow remote add and stream of torrents.
* Improved buffering algorithm, not using only deadline anymore.
## Version 0.1
* Initial working release
* Initial working release

View File

@@ -42,7 +42,7 @@ from setuptools import setup
__plugin_name__ = "Streaming"
__author__ = "John Doee"
__author_email__ = "johndoee@tidalstream.org"
__version__ = "0.3.1"
__version__ = "0.6.0"
__url__ = "https://github.com/JohnDoee/deluge-streaming"
__license__ = "GPLv3"
__description__ = "Enables streaming of files while downloading them."

View File

@@ -40,24 +40,23 @@
import base64
import json
import logging
import math
import os
import urllib
import deluge.configmanager
from collections import defaultdict
from datetime import datetime, timedelta
from deluge import component
from deluge._libtorrent import lt
from deluge.core.rpcserver import export
from deluge.plugins.pluginbase import CorePluginBase
from twisted.internet import reactor, defer, task
from twisted.python import randbytes
from twisted.web import server, resource, static, http
from twisted.web.static import StaticProducer
import deluge.component as component
import deluge.configmanager
from deluge.core.rpcserver import export
from deluge.log import LOG as log
from deluge.plugins.pluginbase import CorePluginBase
from twisted.web import server, resource, static, http, client
from .filelike import FilelikeObjectResource
from .resource import Resource
logger = logging.getLogger(__name__)
@@ -67,22 +66,17 @@ DEFAULT_PREFS = {
'port': 46123,
'allow_remote': False,
'reset_complete': True,
'use_stream_urls': False,
'auto_open_stream_urls': False,
'remote_username': 'username',
'remote_password': 'password',
}
from .filelike import FilelikeObjectResource
MIN_QUEUE_CHUNKS = 6
EXPECTED_PERCENT = 0.3
EXPECTED_SIZE = 5*1024*1024
HANDLERS_TIMEOUT = timedelta(hours=12)
class UnknownTorrentException(Exception):
pass
class UnknownFileException(Exception):
pass
def sleep(seconds):
d = defer.Deferred()
reactor.callLater(seconds, d.callback, seconds)
return d
class FileServeResource(resource.Resource):
isLeaf = True
@@ -105,35 +99,12 @@ class FileServeResource(resource.Resource):
if key not in self.file_mapping:
return resource.NoResource().render(request)
v = self.file_mapping[key]
if isinstance(v, static.File):
return v.render_GET(request)
f = self.file_mapping[key]
if f.is_complete():
return static.File(f.full_path).render_GET(request)
else:
tf = v.copy()
tf.open()
return FilelikeObjectResource(tf, tf.size).render_GET(request)
class AddTorrentResource(Resource):
isLeaf = True
def __init__(self, client, *args, **kwargs):
self.client = client
Resource.__init__(self, *args, **kwargs)
@defer.inlineCallbacks
def render_POST(self, request):
torrent_data = request.args.get('torrent_data', None)
if not torrent_data:
defer.returnValue(json.dumps({'status': 'error', 'message': 'missing torrent_data in request'}))
torrent_data = torrent_data[0].encode('base64')
torrent_id = yield self.client.add_torrent(torrent_data)
if torrent_id is None:
defer.returnValue(json.dumps({'status': 'error', 'message': 'failed to add torrent'}))
defer.returnValue(json.dumps({'status': 'success', 'infohash': torrent_id, 'message': 'torrent added successfully'}))
tfr = f.open()
return FilelikeObjectResource(tfr, f.size).render_GET(request)
class StreamResource(Resource):
isLeaf = True
@@ -156,303 +127,415 @@ class StreamResource(Resource):
result = yield self.client.stream_torrent(infohash[0], path[0])
defer.returnValue(json.dumps(result))
class TorrentFile(object):
file_handler = None
def __init__(self, torrent_handler, file_path, torrent_file_path, size, chunk_size, offset, file_index):
self.torrent_handler = torrent_handler
self.file_path = self.path = file_path
self.torrent_file_path = torrent_file_path
self.first_chunk = offset / chunk_size
self.last_chunk = (offset + size) / chunk_size
self.chunk_size = chunk_size
self.offset = offset
self.file_index = file_index
self.size = size
self.last_requested_chunk = self.first_chunk
self.is_closed = False
self.is_active = False
self.last_activity = datetime.now()
self.end_chunks = []
class UnknownTorrentException(Exception):
pass
class UnknownFileException(Exception):
pass
class TorrentFileReader(object):
def __init__(self, torrent_file):
self.torrent_file = torrent_file
self.size = torrent_file.size
self.position = 0
self.first_chunk_end = self.chunk_size * (self.first_chunk + 1) - offset
self.torrent_handler.torrent_files[torrent_file_path].append(self)
def open(self):
if self.is_closed:
raise IOError('Unable to reopen file')
self.file_handler = open(self.file_path, 'rb')
def get_chunk(self, tell):
i = (tell + 1) - self.first_chunk_end
if i <= 0:
offset = 0
else:
offset = (i / self.chunk_size) + 1
return self.first_chunk + offset, self.first_chunk_end + (offset * self.chunk_size)
def prepare_torrent(self, buffer_pieces):
self.torrent_handler.schedule_chunk(self.first_chunk, 0)
self.torrent_handler.schedule_chunk(self.last_chunk, 0)
self.end_chunks.append(self.last_chunk)
if self.first_chunk != self.last_chunk:
self.torrent_handler.schedule_chunk(self.last_chunk-1, 0)
self.end_chunks.append(self.last_chunk-1)
for chunk, chunk_status in enumerate(self.torrent_handler.torrent.status.pieces[self.first_chunk:min(self.first_chunk+buffer_pieces, self.last_chunk)+1], self.first_chunk):
self.torrent_handler.schedule_chunk(chunk, chunk-self.first_chunk)
self.waiting_for_piece = None
self.current_piece = None
self.current_piece_data = None
@defer.inlineCallbacks
def read(self, size=1024):
self.is_active = True
self.last_activity = datetime.now()
required_piece, read_position = self.torrent_file.get_piece_info(self.position)
tell = self.tell()
chunk, end_of_chunk = self.get_chunk(tell)
self.last_requested_chunk = chunk
if self.current_piece != required_piece:
logger.debug('We are missing piece %i and it is required, requesting' % (required_piece, ))
self.waiting_for_piece = required_piece
self.current_piece_data = yield self.torrent_file.get_piece_data(required_piece)
self.current_piece = required_piece
self.waiting_for_piece = None
logger.debug('waiting for chunk %s, %s, %s' % (chunk, size, tell))
yield self.wait_chunk_complete(chunk)
logger.debug('done waiting %s, %s, %s' % (chunk, size, tell))
logger.debug('We can read from local piece from %s size %s from position %s' % (read_position, size, self.position))
data = self.current_piece_data[read_position:read_position+size]
self.position += len(data)
defer.returnValue(self.file_handler.read(min(end_of_chunk-tell, size)))
def wait_chunk_complete(self, chunk):
d = defer.Deferred()
def check_if_done():
if self.torrent_handler.torrent.status.pieces[chunk]:
return d.callback(True)
self.torrent_handler.schedule_chunk(chunk, 0)
if self.is_closed:
logger.debug('The file closed, shutting down torrent')
return
reactor.callLater(1.0, check_if_done)
check_if_done()
return d
def seek(self, offset, whence=os.SEEK_SET):
return self.file_handler.seek(offset, whence)
defer.returnValue(data)
def tell(self):
return self.file_handler.tell()
return self.position
def close(self):
if not self.is_closed and self in self.torrent_handler.torrent_files[self.torrent_file_path]:
self.torrent_handler.torrent_files[self.torrent_file_path].remove(self)
self.is_closed = True
if self.file_handler:
return self.file_handler.close()
self.torrent_file.close(self)
def copy(self):
tf = TorrentFile(self.torrent_handler, self.file_path, self.torrent_file_path,
self.size, self.chunk_size, self.offset, self.file_index)
return tf
def seek(self, offset, whence=os.SEEK_SET):
self.position = offset
class TorrentHandler(object):
def __init__(self, torrent, torrent_id, core):
class TorrentFile(object): # can be read from, knows about itself
def __init__(self, torrent, first_piece, last_piece, piece_size, offset, path, full_path, size, index):
self.torrent = torrent
self.torrent_handle = torrent.handle
self.torrent_id = torrent_id
self.core = core
self.priorities = [0] * len(torrent.get_status(['files'])['files'])
self.torrent_files = defaultdict(list)
self.priorities_increased = {}
# need to blackhole all pieces not downloaded yet
self.last_activity = datetime.now()
self.update_chunk_priorities()
self.first_piece = first_piece
self.last_piece = last_piece
self.piece_size = piece_size
self.offset = offset
self.path = path
self.size = size
self.full_path = full_path
self.index = index
self.file_requested = False
self.do_shutdown = False
self.first_piece_end = self.piece_size * (self.first_piece + 1) - offset
self.waiting_pieces = {}
self.current_readers = []
self.registered_alert = False
self.alerts = component.get("AlertManager")
def is_buffered(self, first_chunk, last_chunk):
buffer_status = [x for x in self.torrent.status.pieces[first_chunk:last_chunk+1] if not x]
logger.debug('Current buffer status for file (%s): %r' % (first_chunk, buffer_status))
if buffer_status:
return False
else:
return True
def open(self):
"""
Returns a filelike object
"""
if not self.registered_alert:
self.alerts.register_handler("read_piece_alert", self.on_alert_got_piece_data)
self.registered_alert = True
tfr = TorrentFileReader(self)
self.current_readers.append(tfr)
self.file_requested = False
return tfr
def update_priorities(self):
self.torrent.set_file_priorities(self.priorities)
def close(self, tfr):
self.current_readers.remove(tfr)
self.torrent.unprioritize_pieces(tfr)
if not self.current_readers:
self.torrent.unprioritize_pieces(self)
def schedule_chunk(self, chunk, distance):
if chunk not in self.priorities_increased:
logger.debug('Scheduled chunk %s at distance %s' % (chunk, distance))
self.torrent_handle.piece_priority(chunk, (7 if distance <= 4 else 6))
self.torrent_handle.set_piece_deadline(chunk, 700*(distance+1))
self.priorities_increased[chunk] = True
def is_complete(self):
torrent_status = self.torrent.torrent.get_status(['file_progress', 'state'])
file_progress = torrent_status['file_progress']
return file_progress[self.index] == 1.0
def update_chunk_priority(self, tfs):
handled_heads = []
for tf in sorted(tfs, key=lambda x:x.last_requested_chunk, reverse=True):
if datetime.now() - tf.last_activity > HANDLERS_TIMEOUT:
logger.debug('Torrentfile timed out')
tf.close()
continue
if tf.last_requested_chunk is not None:
if handled_heads:
if tf.last_requested_chunk in handled_heads:
continue
already_queued = False
for i in sorted(handled_heads):
if i > tf.last_requested_chunk:
if 0 not in self.torrent.status.pieces[tf.last_requested_chunk:i]:
already_queued = True
break
if already_queued:
continue
offset = tf.last_requested_chunk + 1
current_buffer_offset = 5
for chunk in tf.end_chunks:
if not self.torrent.status.pieces[chunk]:
logger.info('End chunks are not downloaded, setting buffer offset differently')
current_buffer_offset = 20
break
status_increase_count = 0
current_buffer = 0
found_buffer_end = False
for chunk, chunk_status in enumerate(self.torrent.status.pieces[offset:tf.last_chunk+1], offset):
if not chunk_status:
if not found_buffer_end:
logger.debug('Found buffer end at %s, have a buffer of %s' % (chunk, current_buffer))
found_buffer_end = True
if status_increase_count <= MIN_QUEUE_CHUNKS:
self.schedule_chunk(chunk, chunk-offset)
elif self.torrent_handle.piece_priority(chunk) == 0:
self.torrent_handle.piece_priority(chunk, 1)
status_increase_count += 1
elif not found_buffer_end:
current_buffer += 1
if status_increase_count >= max(MIN_QUEUE_CHUNKS, current_buffer-current_buffer_offset):
break
handled_heads.append(tf.last_requested_chunk)
return bool(handled_heads)
def update_chunk_priorities(self): # TODO: check if torrent still exists
file_progress = self.torrent.get_status(['file_progress'])['file_progress']
incomplete_files = False
for torrent_file_path, tfs in self.torrent_files.items():
if not tfs:
logger.debug('No heads left for %r' % torrent_file_path)
del self.torrent_files[torrent_file_path]
continue
tf = tfs[0]
if file_progress[tf.file_index] == 1.0:
logger.info('%s is already complete, skipping' % torrent_file_path)
continue
if self.update_chunk_priority(tfs):
self.last_activity = datetime.now()
incomplete_files = True
if not incomplete_files and self.core.config['reset_complete'] and not all(self.priorities):
logger.info('We are not doing any file streamings, but not downloading all files, changing that.')
self.priorities = [1] * len(self.priorities)
self.update_priorities()
if datetime.now() - self.last_activity > HANDLERS_TIMEOUT:
logger.debug('Torrent handler idle, killing myself.')
if self.torrent_id in self.core.torrent_handlers:
del self.core.torrent_handlers[self.torrent_id]
def get_piece_info(self, tell):
return divmod((self.offset + tell), self.piece_size)
def on_alert_got_piece_data(self, alert):
torrent_id = str(alert.handle.info_hash())
if torrent_id != self.torrent.infohash:
return
reactor.callLater(2, self.update_chunk_priorities)
def blackhole_all_pieces(self, first_chunk, last_chunk):
for chunk in range(first_chunk, last_chunk+1):
self.torrent_handle.piece_priority(chunk, 0)
logger.debug('Got piece data for piece %s' % alert.piece)
if alert.piece not in self.waiting_pieces:
logger.debug('Got data for piece %i, but no data needed for this piece?' % alert.piece)
return
# TODO: check piece size is not zero
self.waiting_pieces[alert.piece].callback(alert.buffer)
@defer.inlineCallbacks
def get_file(self, filepath):
status = self.torrent.get_status(['piece_length', 'files', 'file_priorities', 'file_progress', 'state', 'save_path'])
pieces = self.torrent.status.pieces
piece_length = status['piece_length']
files = status['files']
def get_piece_data(self, piece):
logger.debug('Trying to get piece data for piece %s' % piece)
for reader in self.current_readers:
if reader.current_piece == piece:
defer.returnValue(reader.current_piece_data)
for f, priority, progress in zip(files, status['file_priorities'], status['file_progress']):
if f['path'] == filepath:
f['first_piece'] = f['offset'] / piece_length
f['last_piece'] = (f['offset'] + f['size']) / piece_length
f['pieces'] = pieces[f['first_piece']:f['last_piece']+1]
f['priority'] = priority
f['progress'] = progress
break
else:
raise UnknownFileException()
if piece not in self.waiting_pieces:
self.waiting_pieces[piece] = defer.Deferred()
fp = os.path.join(status['save_path'], f['path'])
if progress == 1: # file is complete, no need to fire up all the torrent jazz
defer.returnValue(static.File(fp))
self.priorities = [0] * len(self.priorities)
self.priorities[f['index']] = 3
current_tfs = []
for tfs in self.torrent_files.values():
if not tfs:
continue
tf = tfs[0]
self.priorities[tf.file_index] = 3
current_tfs.append((tf, self.torrent_handle.piece_priorities()[tf.first_chunk:tf.last_chunk+1]))
self.update_priorities()
for tf, chunk_status in current_tfs:
logger.info('Setting chunks to old status')
for i, chunk in enumerate(chunk_status, tf.first_chunk):
logger.debug('Setting status on chunk %s back to %s' % (i, chunk))
self.torrent_handle.piece_priority(i, chunk)
self.torrent.resume()
percent_pieces = int(math.ceil((len(f['pieces']) / 100.0) * EXPECTED_PERCENT))
size_pieces = int(min(math.ceil((EXPECTED_SIZE * 1.0) / piece_length), f['pieces']))
expected_pieces = max(percent_pieces, size_pieces) # we need to download either 5% or 5MB of the file before allowing stream.
tf = TorrentFile(self, fp, f['path'], f['size'], status['piece_length'], f['offset'], f['index'])
if len(self.torrent_files[f['path']]) == 1:
self.blackhole_all_pieces(tf.first_chunk, tf.last_chunk)
tf.prepare_torrent(expected_pieces)
for _ in range(300):
if os.path.isfile(fp) and self.is_buffered(tf.first_chunk, tf.first_chunk+expected_pieces) and self.is_buffered(tf.last_chunk, tf.last_chunk):
break
logger.debug('Waiting for %s' % piece)
self.torrent.schedule_piece(self, self, piece, 0)
while not self.torrent.torrent.handle.have_piece(piece):
if self.do_shutdown:
raise Exception()
logger.debug('Did not have piece %i, waiting' % piece)
self.torrent.unrelease()
yield sleep(1)
defer.returnValue(tf)
self.torrent.torrent.handle.read_piece(piece)
data = yield self.waiting_pieces[piece]
if piece in self.waiting_pieces:
del self.waiting_pieces[piece]
logger.debug('Done waiting for piece %i, returning data' % piece)
defer.returnValue(data)
def shutdown(self):
self.do_shutdown = True
def sleep(seconds):
d = defer.Deferred()
reactor.callLater(seconds, d.callback, seconds)
return d
class Torrent(object):
def __init__(self, torrent_handler, infohash):
self.infohash = infohash
self.torrent = component.get("TorrentManager").torrents.get(infohash, None)
self.torrent_handler = torrent_handler
if not self.torrent:
raise UnknownTorrentException('%s is not a known infohash' % infohash)
self.torrent_files = None
self.priority_increased = defaultdict(set)
self.do_shutdown = False
self.torrent_released = True # set to True if all the files are set to download
self.populate_files()
self.file_priorities = [0] * len(self.torrent_files)
self.last_piece = self.torrent_files[-1].last_piece
self.torrent.handle.set_sequential_download(True)
reactor.callLater(0, self.update_piece_priority)
reactor.callLater(0, self.blackhole_all_pieces, 0, self.last_piece)
def is_unstreamed(self): # check if anyone streams this file
for tf in self.torrent_files:
if tf.current_readers:
return True
return False
def populate_files(self):
self.torrent_files = []
status = self.torrent.get_status(['piece_length', 'files', 'save_path'])
piece_length = status['piece_length']
files = status['files']
save_path = status['save_path']
for f in files:
first_piece = f['offset'] / piece_length
last_piece = (f['offset'] + f['size']) / piece_length
full_path = os.path.join(save_path, f['path'])
self.torrent_files.append(TorrentFile(self, first_piece, last_piece, piece_length, f['offset'],
f['path'], full_path, f['size'], f['index']))
return files
def find_file(self, file_or_index=None, includes_name=False):
best_file = None
biggest_file_size = 0
for i, f in enumerate(self.torrent_files):
path = f.path
if not includes_name and '/' in path:
path = '/'.join(path.split('/')[1:])
logger.debug('Testing file %r against %s / %r' % (file_or_index, i, path))
if file_or_index is not None:
if i == file_or_index or path == file_or_index:
best_file = f
break
else:
if f.size > biggest_file_size:
best_file = f
biggest_file_size = f.size
return best_file
def get_file(self, file_or_index=None, includes_name=False):
f = self.find_file(file_or_index, includes_name)
if f is None:
raise UnknownFileException('Was unable to find %s' % file_or_index)
return f
def unprioritize_pieces(self, tfr):
logger.debug('Unprioritizing pieces for %s' % tfr)
if self.torrent_handler.config['reset_complete'] and self.is_unstreamed():
logger.debug('Not unprioritizing pieces because there are no streams and it would stop the file (causes problems)')
return
currently_downloading = self.get_currently_downloading()
for piece, increased_by in self.priority_increased.items():
if tfr in increased_by:
increased_by.remove(tfr)
if not increased_by and piece not in currently_downloading and not self.torrent.status.pieces[piece]:
logger.debug('Unprioritizing piece %s' % piece)
self.torrent.handle.piece_priority(piece, 0)
def get_currently_downloading(self):
currently_downloading = set()
for peer in self.torrent.handle.get_peer_info():
if peer.downloading_piece_index != -1:
currently_downloading.add(peer.downloading_piece_index)
return currently_downloading
def blackhole_all_pieces(self, first_piece, last_piece):
currently_downloading = self.get_currently_downloading()
logger.debug('Blacklisting pieces from %i to %i skipping %r' % (first_piece, last_piece, currently_downloading))
for piece in range(first_piece, last_piece+1):
if piece not in currently_downloading and not self.torrent.status.pieces[piece]:
if piece in self.priority_increased:
continue
#logger.debug('Setting piece priority %s to blacklist' % piece)
self.torrent.handle.piece_priority(piece, 0)
def unrelease(self):
if self.torrent_released:
logger.debug('Unreleasing %s' % self.infohash)
self.torrent_released = False
self.torrent.set_file_priorities(self.file_priorities)
self.blackhole_all_pieces(0, self.last_piece)
def get_torrent_file(self, file_or_index, includes_name):
f = self.get_file(file_or_index, includes_name)
f.file_requested = True
self.torrent.resume()
should_update_priorities = False
if self.file_priorities[f.index] == 0:
self.file_priorities[f.index] = 3
should_update_priorities = True
if self.torrent_released:
should_update_priorities = True
if should_update_priorities and not f.is_complete(): # Need to do this stuff on seek too
self.unrelease()
return f
def shutdown(self):
logger.info('Shutting down torrent %s' % self.infohash)
self.do_shutdown = True
for tf in self.torrent_files:
tf.shutdown()
def schedule_piece(self, torrent_file, schedule_target, piece, distance):
if schedule_target not in self.priority_increased[piece]:
if not self.priority_increased[piece]:
logger.debug('Scheduled piece %s at distance %s' % (piece, distance))
self.torrent.handle.piece_priority(piece, (7 if distance <= 4 else 6))
self.torrent.handle.set_piece_deadline(piece, 700*(distance+1))
self.priority_increased[piece].add(schedule_target)
def do_pieces_schedule(self, torrent_file, schedule_target, currently_downloading, from_piece):
logger.debug('Looking for stuff to do with pieces for file %s from piece %s' % (torrent_file, from_piece))
priority_increased = 0
chain_size = 0
download_chain_size = 0
end_of_chain = False
current_buffer_offset = 5
if self.torrent.status.pieces[torrent_file.last_piece]:
if torrent_file.first_piece != torrent_file.last_piece and self.torrent.status.pieces[torrent_file.last_piece-1] \
or torrent_file.first_piece == torrent_file.last_piece:
current_buffer_offset = 20
for piece, status in enumerate(self.torrent.status.pieces[from_piece:torrent_file.last_piece+1], from_piece):
if not end_of_chain:
if status:
chain_size += 1
elif piece in currently_downloading:
download_chain_size += 1
if not status and piece not in currently_downloading:
if not end_of_chain:
status_increase = max(11, chain_size-current_buffer_offset)
end_of_chain = True
priority_increased += 1
if priority_increased >= status_increase:
logger.debug('Done increasing priority for %i pieces' % status_increase)
break
self.schedule_piece(torrent_file, schedule_target, piece, piece-from_piece)
else:
logger.info('We are done with the rest of this chain, we might be able to increase others')
return True
return False
def update_piece_priority(self): # if all do_pieces_schedule returns true, allow all pices of file to be downloaded or whole torernt
if self.do_shutdown:
return
logger.debug('Updating piece priority for %s' % self.infohash)
currently_downloading = set()
for peer in self.torrent.handle.get_peer_info():
if peer.downloading_piece_index != -1:
currently_downloading.add(peer.downloading_piece_index)
all_heads_done = True
for f in self.torrent_files:
if not f.file_requested and not f.current_readers:
continue
logger.debug('Rescheduling file %s' % f.path)
if f.file_requested:
all_heads_done &= self.do_pieces_schedule(f, f, currently_downloading, f.first_piece)
self.schedule_piece(f, f, f.last_piece, 0)
if f.first_piece != f.last_piece:
self.schedule_piece(f, f, f.last_piece-1, 1)
for tfr in f.current_readers:
if tfr.waiting_for_piece is not None:
logger.debug('Scheduling based on waiting for piece %s' % tfr.waiting_for_piece)
all_heads_done &= self.do_pieces_schedule(f, tfr, currently_downloading, tfr.waiting_for_piece)
elif tfr.current_piece is not None:
logger.debug('Scheduling based on current piece %s' % tfr.current_piece)
all_heads_done &= self.do_pieces_schedule(f, tfr, currently_downloading, tfr.current_piece)
if all(self.torrent.status.pieces):
logger.debug('All pieces complete, no need to loop')
return
if all_heads_done and not self.torrent_released:
logger.debug('We are already done with all heads, figuring out what to do next')
if self.torrent_handler.config['reset_complete']:
self.torrent_released = True
logger.debug('Resetting all disabled files')
file_priorities = [(1 if fp == 0 else fp) for fp in self.file_priorities]
self.torrent.set_file_priorities(file_priorities)
if not all_heads_done and self.torrent_released:
logger.debug('Seems like the torrent was released too early')
self.unrelease()
reactor.callLater(0.3, self.update_piece_priority)
class TorrentHandler(object):
def __init__(self, config):
self.torrents = {}
self.config = config
self.alerts = component.get("AlertManager")
self.alerts.register_handler("torrent_removed_alert", self.on_alert_torrent_removed)
def get_stream(self, infohash, file_or_index=None, includes_name=False):
logger.info('Trying to stream infohash %s and file %s include_name %s' % (infohash, file_or_index, includes_name))
if infohash not in self.torrents:
self.torrents[infohash] = Torrent(self, infohash)
return self.torrents[infohash].get_torrent_file(file_or_index, includes_name)
def on_alert_torrent_removed(self, alert):
try:
torrent_id = str(alert.handle.info_hash())
except (RuntimeError, KeyError):
logger.warning('Failed to handle on torrent remove alert')
return
if torrent_id not in self.torrents:
return
self.torrents[torrent_id].shutdown()
del self.torrents[torrent_id]
def shutdown(self):
logger.debug('Shutting down TorrentHandler')
self.alerts.deregister_handler(self.on_alert_torrent_removed)
for torrent in self.torrents.values():
torrent.shutdown()
class Core(CorePluginBase):
def enable(self):
@@ -462,30 +545,31 @@ class Core(CorePluginBase):
self.resource = Resource()
self.resource.putChild('file', self.fsr)
if self.config['allow_remote']:
self.resource.putChild('add_torrent', AddTorrentResource(username=self.config['remote_username'],
password=self.config['remote_password'],
client=self))
self.resource.putChild('stream', StreamResource(username=self.config['remote_username'],
password=self.config['remote_password'],
client=self))
self.site = server.Site(self.resource)
session = component.get("Core").session
settings = session.get_settings()
settings['prioritize_partial_pieces'] = True
session.set_settings(settings)
try:
session = component.get("Core").session
settings = session.get_settings()
settings['prioritize_partial_pieces'] = True
session.set_settings(settings)
except AttributeError:
logger.warning('Unable to exclude partial pieces')
self.torrent_handlers = {}
self.torrent_handler = TorrentHandler(self.config)
try:
self.listening = reactor.listenTCP(self.config['port'], self.site, interface=self.config['ip'])
except:
self.listening = reactor.listenTCP(self.config['port'], self.site, interface='127.0.0.1')
self.listening = reactor.listenTCP(self.config['port'], self.site, interface='0.0.0.0')
@defer.inlineCallbacks
def disable(self):
self.site.stopFactory()
self.torrent_handler.shutdown()
yield self.listening.stopListening()
def update(self):
@@ -495,7 +579,6 @@ class Core(CorePluginBase):
@defer.inlineCallbacks
def set_config(self, config):
"""Sets the config dictionary"""
do_reload = False
for key in config.keys():
self.config[key] = config[key]
self.config.save()
@@ -510,43 +593,36 @@ class Core(CorePluginBase):
@export
@defer.inlineCallbacks
def add_torrent(self, torrent_data):
core = component.get("Core")
tid = yield core.add_torrent_file('file.torrent', torrent_data, {'add_paused': True})
def stream_torrent(self, infohash=None, url=None, filedump=None, filepath_or_index=None, includes_name=False):
tor = component.get("TorrentManager").torrents.get(infohash, None)
tor = component.get("TorrentManager").torrents.get(tid, None)
state = tor.get_status(['files'])
tor.set_file_priorities([0] * len(state['files']))
defer.returnValue(tid)
def get_torrent_handler(self, tid):
if tid not in self.torrent_handlers:
tor = component.get("TorrentManager").torrents.get(tid, None)
if tor is None:
logger.info('Did not find torrent, must add it')
if tor is None:
raise UnknownTorrentException()
if not filedump and url:
filedump = yield client.getPage(url)
self.torrent_handlers[tid] = TorrentHandler(tor, tid, self)
if not filedump:
defer.returnValue({'status': 'error', 'message': 'unable to find torrent, provide infohash, url or filedump'})
torrent_info = lt.torrent_info(lt.bdecode(filedump))
infohash = str(torrent_info.info_hash())
return self.torrent_handlers[tid]
@export
@defer.inlineCallbacks
def stream_torrent(self, tid, filepath):
core = component.get("Core")
try:
yield core.add_torrent_file('file.torrent', filedump.encode('base64'), {'add_paused': True})
except:
defer.returnValue({'status': 'error', 'message': 'failed to add torrent'})
try:
torrent_handler = yield self.get_torrent_handler(tid)
except UnknownTorrentException: # torrent isn't added yet
defer.returnValue({'status': 'error', 'message': 'torrent_not_found'})
try:
tf = yield torrent_handler.get_file(filepath)
except UnknownFileException:
defer.returnValue({'status': 'error', 'message': 'file_not_found'})
tf = self.torrent_handler.get_stream(infohash, filepath_or_index, includes_name)
except UnknownTorrentException:
defer.returnValue({'status': 'error', 'message': 'unable to find torrent, probably failed to add it'})
defer.returnValue({
'status': 'success',
'use_stream_urls': self.config['use_stream_urls'],
'auto_open_stream_urls': self.config['auto_open_stream_urls'],
'url': 'http://%s:%s/file/%s/%s' % (self.config.config['ip'], self.config.config['port'],
self.fsr.add_file(tf), urllib.quote_plus(os.path.basename(tf.path)))
self.fsr.add_file(tf), urllib.quote_plus(os.path.basename(tf.path).encode('utf-8')))
})

View File

@@ -33,6 +33,28 @@
</packing>
</child>
<child>
<widget class="GtkLabel" id="label_use_stream_urls">
<property name="visible">True</property>
<property name="xalign">0</property>
<property name="label" translatable="yes">Use stream urls:</property>
</widget>
<packing>
<property name="position">2</property>
</packing>
</child>
<child>
<widget class="GtkLabel" id="label_auto_open_stream_urls">
<property name="visible">True</property>
<property name="xalign">0</property>
<property name="label" translatable="yes">Auto-open stream urls:</property>
</widget>
<packing>
<property name="position">3</property>
</packing>
</child>
<child>
<widget class="GtkLabel" id="label_reset_complete">
<property name="visible">True</property>
@@ -40,7 +62,7 @@
<property name="label" translatable="yes">Reset "do not download" when streamed file is complete:</property>
</widget>
<packing>
<property name="position">2</property>
<property name="position">4</property>
</packing>
</child>
@@ -51,7 +73,7 @@
<property name="label" translatable="yes">Allow remote control:</property>
</widget>
<packing>
<property name="position">3</property>
<property name="position">5</property>
</packing>
</child>
@@ -62,7 +84,7 @@
<property name="label" translatable="yes">Remote username:</property>
</widget>
<packing>
<property name="position">4</property>
<property name="position">6</property>
</packing>
</child>
@@ -73,7 +95,7 @@
<property name="label" translatable="yes">Remote password:</property>
</widget>
<packing>
<property name="position">5</property>
<property name="position">7</property>
</packing>
</child>
@@ -108,7 +130,7 @@
</child>
<child>
<widget class="GtkCheckButton" id="input_reset_complete">
<widget class="GtkCheckButton" id="input_use_stream_urls">
<property name="visible">True</property>
<property name="can_focus">True</property>
</widget>
@@ -118,7 +140,7 @@
</child>
<child>
<widget class="GtkCheckButton" id="input_allow_remote">
<widget class="GtkCheckButton" id="input_auto_open_stream_urls">
<property name="visible">True</property>
<property name="can_focus">True</property>
</widget>
@@ -128,7 +150,7 @@
</child>
<child>
<widget class="GtkEntry" id="input_remote_username">
<widget class="GtkCheckButton" id="input_reset_complete">
<property name="visible">True</property>
<property name="can_focus">True</property>
</widget>
@@ -137,6 +159,26 @@
</packing>
</child>
<child>
<widget class="GtkCheckButton" id="input_allow_remote">
<property name="visible">True</property>
<property name="can_focus">True</property>
</widget>
<packing>
<property name="position">5</property>
</packing>
</child>
<child>
<widget class="GtkEntry" id="input_remote_username">
<property name="visible">True</property>
<property name="can_focus">True</property>
</widget>
<packing>
<property name="position">6</property>
</packing>
</child>
<child>
<widget class="GtkEntry" id="input_remote_password">
<property name="visible">True</property>
@@ -144,13 +186,13 @@
<property name="can_focus">True</property>
</widget>
<packing>
<property name="position">5</property>
<property name="position">7</property>
</packing>
</child>
</widget>
<packing>
<property name="position">1</property>
<property name="position">7</property>
</packing>
</child>
</widget>

View File

@@ -31,20 +31,179 @@ Copyright:
statement from all source files in the program, then also delete it here.
*/
PreferencePage = Ext.extend(Ext.Panel, {
title: 'Streaming',
border: false,
layout: 'form',
initComponent: function() {
PreferencePage.superclass.initComponent.call(this);
var om = this.optionsManager = new Deluge.OptionsManager();
this.on('show', this.onPageShow, this);
var fieldset = this.add({
xtype: 'fieldset',
border: false,
title: 'Streaming',
style: 'margin-bottom: 0px; padding-bottom: 0px; padding-top: 5px',
autoHeight: true,
labelWidth: 110,
defaultType: 'textfield',
defaults: {
width: 180,
}
});
om.bind('port', fieldset.add({
name: 'port',
fieldLabel: _('Port'),
decimalPrecision: 0,
minValue: -1,
maxValue: 99999
}));
om.bind('ip', fieldset.add({
name: 'ip',
fieldLabel: 'IP'
}));
om.bind('use_stream_urls', fieldset.add({
xtype: 'checkbox',
name: 'use_stream_urls',
fieldLabel: 'Use StreamProtocol urls',
}));
om.bind('auto_open_stream_urls', fieldset.add({
xtype: 'checkbox',
name: 'auto_open_stream_urls',
fieldLabel: 'AutoOpen StreamProtocol urls',
}));
om.bind('reset_complete', fieldset.add({
xtype: 'checkbox',
name: 'reset_complete',
fieldLabel: 'Reset "do not download" when streamed file is complete',
}));
om.bind('allow_remote', fieldset.add({
xtype: 'checkbox',
name: 'allow_remote',
fieldLabel: 'Allow remote control checkbox',
}));
om.bind('remote_username', fieldset.add({
name: 'remote_username',
fieldLabel: 'Remote username'
}));
om.bind('remote_password', fieldset.add({
name: 'remote_password',
inputType: 'password',
fieldLabel: 'Remote password'
}));
},
onApply: function() {
var changed = this.optionsManager.getDirty();
if (!Ext.isObjectEmpty(changed)) {
deluge.client.streaming.set_config(changed, {
success: this.onSetConfig,
scope: this
});
for (var key in deluge.config) {
deluge.config[key] = this.optionsManager.get(key);
}
}
},
onSetConfig: function() {
this.optionsManager.commit();
},
onGotConfig: function(config) {
this.optionsManager.set(config);
},
onPageShow: function() {
deluge.client.streaming.get_config({
success: this.onGotConfig,
scope: this
})
}
});
StreamingPlugin = Ext.extend(Deluge.Plugin, {
constructor: function(config) {
config = Ext.apply({
name: "Streaming"
}, config);
StreamingPlugin.superclass.constructor.call(this, config);
},
'name': 'Streaming',
onDisable: function() {
deluge.menus.filePriorities.remove('streamthis');
deluge.preferences.selectPage(_('Plugins'));
deluge.preferences.removePage(this.prefsPage);
this.prefsPage.destroy();
},
onEnable: function() {
onEnable: function() {
this.prefsPage = new PreferencePage();
deluge.preferences.addPage(this.prefsPage);
console.log('Streaming plugin loaded');
var doStream = function (tid, fileIndex) {
deluge.client.streaming.stream_torrent(tid, null, null, fileIndex, true, {
success: function (result) {
console.log('Got result', result);
if (result.status == 'success') {
var url = result.url;
if (result.use_stream_urls) {
url = 'stream+' + url;
if (result.auto_open_stream_urls) {
window.location.assign(url);
return;
}
}
Ext.Msg.alert('Stream ready', 'URL for stream: <a target="_blank" href="' + url + '">' + url + '</a>');
} else {
Ext.Msg.alert('Stream failed', 'Error message: ' + result.message);
}
}
})
}
deluge.menus.filePriorities.addMenuItem({
id: 'streamthis',
text: 'Stream this file',
iconCls: 'icon-down',
handler: function (item, event) {
deluge.menus.filePriorities.hide();
var files = deluge.details.items.items[2];
var nodes = files.getSelectionModel().getSelectedNodes();
if (nodes) {
var fileIndex = nodes[0].attributes.fileIndex;
var tid = files.torrentId;
if (fileIndex >= 0) {
doStream(tid, fileIndex);
}
}
return false;
}
});
deluge.menus.torrent.addMenuItem({
id: 'streamthistorrent',
text: 'Stream this torrent',
iconCls: 'icon-down',
handler: function (item, event) {
deluge.menus.torrent.hide();
var ids = deluge.torrents.getSelectedIds();
if (ids) {
doStream(ids[0]);
}
return false;
}
});
}
});
new StreamingPlugin();
Deluge.registerPlugin('Streaming', StreamingPlugin);

View File

@@ -12,7 +12,12 @@ class NoRangeStaticProducer(static.NoRangeStaticProducer):
def resumeProducing(self):
if not self.request:
return
data = yield defer.maybeDeferred(self.fileObject.read, self.bufferSize)
if not self.request:
return
if data:
# this .write will spin the reactor, calling .doWrite and then
# .resumeProducing again, so be prepared for a re-entrant call
@@ -27,13 +32,19 @@ class SingleRangeStaticProducer(static.SingleRangeStaticProducer):
def resumeProducing(self):
if not self.request:
return
data = yield defer.maybeDeferred(self.fileObject.read,
min(self.bufferSize, self.size - self.bytesWritten))
if not self.request:
return
if data:
self.bytesWritten += len(data)
# this .write will spin the reactor, calling .doWrite and then
# .resumeProducing again, so be prepared for a re-entrant call
self.request.write(data)
if self.request and self.bytesWritten == self.size:
self.request.unregisterProducer()
self.request.finish()
@@ -44,6 +55,7 @@ class MultipleRangeStaticProducer(static.MultipleRangeStaticProducer):
def resumeProducing(self):
if not self.request:
return
data = []
dataLength = 0
done = False
@@ -64,10 +76,16 @@ class MultipleRangeStaticProducer(static.MultipleRangeStaticProducer):
except StopIteration:
done = True
break
if not self.request:
return
self.request.write(''.join(data))
if done:
self.request.unregisterProducer()
self.request.finish()
self.stopProducing()
self.request = None
class FilelikeObjectResource(static.File):
@@ -141,6 +159,7 @@ class FilelikeObjectResource(static.File):
producer = self.makeProducer(request, self.fileObject)
if request.method == 'HEAD':
self.fileObject.close()
return ''
producer.start()

View File

@@ -37,7 +37,9 @@
# statement from all source files in the program, then also delete it here.
#
import json
import gtk
import webbrowser
from deluge.log import LOG as log
from deluge.ui.client import client
@@ -46,8 +48,41 @@ from deluge.plugins.pluginbase import GtkPluginBase
import deluge.component as component
import deluge.common
from twisted.internet import reactor, defer, threads
from twisted.web import server, resource
from common import get_resource
class LocalAddResource(resource.Resource):
gtkui = None
isLeaf = True
def __init__(self, gtkui):
self.gtkui = gtkui
resource.Resource.__init__(self)
def render_GET(self, request):
useragent = request.getHeader('User-Agent')
if 'Deluge-Streamer' not in useragent:
request.setResponseCode(401)
return 'Unauthorized'
torrent_url = request.args.get('url', None)
if not torrent_url:
return json.dumps({'status': 'error', 'message': 'missing url in request'})
torrent_file = request.args.get('file', None)
if torrent_file:
torrent_file = torrent_file[0]
infohash = request.args.get('infohash', None)
if infohash:
infohash = infohash[0]
client.streaming.stream_torrent(url=torrent_url[0], infohash=infohash, filepath_or_index=torrent_file).addCallback(self.gtkui.stream_ready)
return json.dumps({'status': 'ok', 'message': 'queued'})
class GtkUI(GtkPluginBase):
def enable(self):
self.glade = gtk.glade.XML(get_resource("config.glade"))
@@ -67,7 +102,24 @@ class GtkUI(GtkPluginBase):
self.sep.show()
self.item.show()
torrentmenu = component.get("MenuBar").torrentmenu
self.sep_torrentmenu = gtk.SeparatorMenuItem()
self.item_torrentmenu = gtk.MenuItem(_("_Stream this torrent"))
self.item_torrentmenu.connect("activate", self.on_torrentmenu_menuitem_stream)
torrentmenu.append(self.sep_torrentmenu)
torrentmenu.append(self.item_torrentmenu)
self.sep_torrentmenu.show()
self.item_torrentmenu.show()
self.resource = LocalAddResource(self)
self.site = server.Site(self.resource)
self.listening = reactor.listenTCP(40747, self.site, interface='127.0.0.1')
@defer.inlineCallbacks
def disable(self):
component.get("Preferences").remove_page("Streaming")
component.get("PluginManager").deregister_hook("on_apply_prefs", self.on_apply_prefs)
@@ -77,12 +129,22 @@ class GtkUI(GtkPluginBase):
file_menu.remove(self.item)
file_menu.remove(self.sep)
torrentmenu = component.get("MenuBar").torrentmenu
torrentmenu.remove(self.item_torrentmenu)
torrentmenu.remove(self.sep_torrentmenu)
self.site.stopFactory()
yield self.listening.stopListening()
def on_apply_prefs(self):
log.debug("applying prefs for Streaming")
config = {
"ip": self.glade.get_widget("input_ip").get_text(),
"port": int(self.glade.get_widget("input_port").get_text()),
"use_stream_urls": self.glade.get_widget("input_use_stream_urls").get_active(),
"auto_open_stream_urls": self.glade.get_widget("input_auto_open_stream_urls").get_active(),
"allow_remote": self.glade.get_widget("input_allow_remote").get_active(),
"reset_complete": self.glade.get_widget("input_reset_complete").get_active(),
"remote_username": self.glade.get_widget("input_remote_username").get_text(),
@@ -98,11 +160,26 @@ class GtkUI(GtkPluginBase):
"callback for on show_prefs"
self.glade.get_widget("input_ip").set_text(config["ip"])
self.glade.get_widget("input_port").set_text(str(config["port"]))
self.glade.get_widget("input_use_stream_urls").set_active(config["use_stream_urls"])
self.glade.get_widget("input_auto_open_stream_urls").set_active(config["auto_open_stream_urls"])
self.glade.get_widget("input_allow_remote").set_active(config["allow_remote"])
self.glade.get_widget("input_reset_complete").set_active(config["reset_complete"])
self.glade.get_widget("input_remote_username").set_text(config["remote_username"])
self.glade.get_widget("input_remote_password").set_text(config["remote_password"])
def stream_ready(self, result):
if result['status'] == 'success':
if result.get('use_stream_urls', False):
url = "stream+%s" % result['url']
if result.get('auto_open_stream_urls', False):
threads.deferToThread(webbrowser.open, url)
else:
dialogs.InformationDialog('Stream ready', '<a href="%s">Click here to open it</a>' % url).run()
else:
dialogs.ErrorDialog('Stream ready', 'Copy the link into a media player', details=result['url']).run()
else:
dialogs.ErrorDialog('Stream failed', 'Was unable to prepare the stream', details=result).run()
def on_menuitem_stream(self, data=None):
torrent_id = component.get("TorrentView").get_selected_torrents()[0]
@@ -113,13 +190,11 @@ class GtkUI(GtkPluginBase):
for path in paths:
selected.append(ft.treestore.get_iter(path))
def stream_ready(result):
if result['status'] == 'success':
dialogs.ErrorDialog('Stream ready', 'Copy the link into a media player', details=result['url']).run()
else:
dialogs.ErrorDialog('Stream failed', 'Was unable to prepare the stream', details=result).run()
for select in selected:
path = ft.get_file_path(select)
client.streaming.stream_torrent(torrent_id, path).addCallback(stream_ready)
break
client.streaming.stream_torrent(infohash=torrent_id, filepath_or_index=path, includes_name=True).addCallback(self.stream_ready)
break
def on_torrentmenu_menuitem_stream(self, data=None):
torrent_id = component.get("TorrentView").get_selected_torrents()[0]
client.streaming.stream_torrent(infohash=torrent_id).addCallback(self.stream_ready)

View File

@@ -46,10 +46,4 @@ from common import get_resource
class WebUI(WebPluginBase):
scripts = [get_resource("streaming.js")]
def enable(self):
pass
def disable(self):
pass
scripts = [get_resource("streaming.js")]