diff --git a/streaming/core.py b/streaming/core.py
index 9266d7d..6e41904 100644
--- a/streaming/core.py
+++ b/streaming/core.py
@@ -40,25 +40,23 @@
import base64
import json
import logging
-import math
import os
-import time
import urllib
+import deluge.configmanager
+
from collections import defaultdict
-from datetime import datetime, timedelta
+
+from deluge import component
+from deluge._libtorrent import lt
+from deluge.core.rpcserver import export
+from deluge.plugins.pluginbase import CorePluginBase
from twisted.internet import reactor, defer, task
from twisted.python import randbytes
-from twisted.web import server, resource, static, http
-from twisted.web.static import StaticProducer
-
-import deluge.component as component
-import deluge.configmanager
-from deluge.core.rpcserver import export
-from deluge.log import LOG as log
-from deluge.plugins.pluginbase import CorePluginBase
+from twisted.web import server, resource, static, http, client
+from .filelike import FilelikeObjectResource
from .resource import Resource
logger = logging.getLogger(__name__)
@@ -68,22 +66,17 @@ DEFAULT_PREFS = {
'port': 46123,
'allow_remote': False,
'reset_complete': True,
+ 'use_stream_urls': True,
+ 'auto_open_stream_urls': False,
'remote_username': 'username',
'remote_password': 'password',
}
-from .filelike import FilelikeObjectResource
-MIN_QUEUE_CHUNKS = 6
-EXPECTED_PERCENT = 0.3
-EXPECTED_SIZE = 5*1024*1024
-HANDLERS_TIMEOUT = timedelta(hours=12)
-
-class UnknownTorrentException(Exception):
- pass
-
-class UnknownFileException(Exception):
- pass
+def sleep(seconds):
+ d = defer.Deferred()
+ reactor.callLater(seconds, d.callback, seconds)
+ return d
class FileServeResource(resource.Resource):
isLeaf = True
@@ -106,35 +99,12 @@ class FileServeResource(resource.Resource):
if key not in self.file_mapping:
return resource.NoResource().render(request)
- v = self.file_mapping[key]
- if isinstance(v, static.File):
- return v.render_GET(request)
+ f = self.file_mapping[key]
+ if f.is_complete():
+ return static.File(f.full_path).render_GET(request)
else:
- tf = v.copy()
- tf.open()
- return FilelikeObjectResource(tf, tf.size).render_GET(request)
-
-class AddTorrentResource(Resource):
- isLeaf = True
-
- def __init__(self, client, *args, **kwargs):
- self.client = client
- Resource.__init__(self, *args, **kwargs)
-
- @defer.inlineCallbacks
- def render_POST(self, request):
- torrent_data = request.args.get('torrent_data', None)
- if not torrent_data:
- defer.returnValue(json.dumps({'status': 'error', 'message': 'missing torrent_data in request'}))
-
- torrent_data = torrent_data[0].encode('base64')
-
- torrent_id = yield self.client.add_torrent(torrent_data)
-
- if torrent_id is None:
- defer.returnValue(json.dumps({'status': 'error', 'message': 'failed to add torrent'}))
-
- defer.returnValue(json.dumps({'status': 'success', 'infohash': torrent_id, 'message': 'torrent added successfully'}))
+ tfr = f.open()
+ return FilelikeObjectResource(tfr, f.size).render_GET(request)
class StreamResource(Resource):
isLeaf = True
@@ -157,310 +127,392 @@ class StreamResource(Resource):
result = yield self.client.stream_torrent(infohash[0], path[0])
defer.returnValue(json.dumps(result))
-class TorrentFile(object):
- file_handler = None
- def __init__(self, torrent_handler, file_path, torrent_file_path, size, chunk_size, offset, file_index):
- self.torrent_handler = torrent_handler
- self.file_path = self.path = file_path
- self.torrent_file_path = torrent_file_path
- self.first_chunk = offset / chunk_size
- self.last_chunk = (offset + size) / chunk_size
- self.chunk_size = chunk_size
- self.offset = offset
- self.file_index = file_index
- self.size = size
- self.last_requested_chunk = self.first_chunk
- self.is_closed = False
- self.is_active = False
- self.last_activity = datetime.now()
- self.end_chunks = []
+class UnknownTorrentException(Exception):
+ pass
+
+class UnknownFileException(Exception):
+ pass
+
+class TorrentFileReader(object):
+ def __init__(self, torrent_file):
+ self.torrent_file = torrent_file
+ self.size = torrent_file.size
+ self.position = 0
- self.first_chunk_end = self.chunk_size * (self.first_chunk + 1) - offset
-
- self.torrent_handler.torrent_files[torrent_file_path].append(self)
-
- def open(self):
- if self.is_closed:
- raise IOError('Unable to reopen file')
-
- self.file_handler = open(self.file_path, 'rb')
-
- def get_chunk(self, tell):
- i = (tell + 1) - self.first_chunk_end
- if i <= 0:
- offset = 0
- else:
- offset = (i / self.chunk_size) + 1
-
- return self.first_chunk + offset, self.first_chunk_end + (offset * self.chunk_size)
-
- def prepare_torrent(self, buffer_pieces):
- self.torrent_handler.schedule_chunk(self.first_chunk, 0)
- self.torrent_handler.schedule_chunk(self.last_chunk, 0)
-
- self.end_chunks.append(self.last_chunk)
-
- if self.first_chunk != self.last_chunk:
- self.torrent_handler.schedule_chunk(self.last_chunk-1, 0)
- self.end_chunks.append(self.last_chunk-1)
-
- for chunk, chunk_status in enumerate(self.torrent_handler.torrent.status.pieces[self.first_chunk:min(self.first_chunk+buffer_pieces, self.last_chunk)+1], self.first_chunk):
- self.torrent_handler.schedule_chunk(chunk, chunk-self.first_chunk)
+ self.waiting_for_piece = None
+ self.current_piece = None
+ self.current_piece_data = None
@defer.inlineCallbacks
def read(self, size=1024):
- self.is_active = True
- self.last_activity = datetime.now()
+ required_piece, read_position = self.torrent_file.get_piece_info(self.position)
- tell = self.tell()
- chunk, end_of_chunk = self.get_chunk(tell)
- self.last_requested_chunk = chunk
+ if self.current_piece != required_piece:
+ logger.debug('We are missing piece %i and it is required, requesting' % (required_piece, ))
+ self.waiting_for_piece = required_piece
+ self.current_piece_data = yield self.torrent_file.get_piece_data(required_piece)
+ self.current_piece = required_piece
+ self.waiting_for_piece = None
- logger.debug('waiting for chunk %s, %s, %s' % (chunk, size, tell))
- yield self.wait_chunk_complete(chunk)
- logger.debug('done waiting %s, %s, %s' % (chunk, size, tell))
+ logger.debug('We can read from local piece from %s size %s from position %s' % (read_position, size, self.position))
+ data = self.current_piece_data[read_position:read_position+size]
+ self.position += len(data)
- defer.returnValue(self.file_handler.read(min(end_of_chunk-tell, size)))
-
- def wait_chunk_complete(self, chunk):
- d = defer.Deferred()
-
- def check_if_done():
- if self.torrent_handler.torrent.status.pieces[chunk]:
- return d.callback(True)
-
- self.torrent_handler.schedule_chunk(chunk, 0)
-
- if self.is_closed:
- logger.debug('The file closed, shutting down torrent')
- return
-
- reactor.callLater(1.0, check_if_done)
-
- check_if_done()
-
- return d
-
- def seek(self, offset, whence=os.SEEK_SET):
- return self.file_handler.seek(offset, whence)
+ defer.returnValue(data)
def tell(self):
- return self.file_handler.tell()
+ return self.position
def close(self):
- if not self.is_closed and self in self.torrent_handler.torrent_files[self.torrent_file_path]:
- self.torrent_handler.torrent_files[self.torrent_file_path].remove(self)
- self.is_closed = True
-
- if self.file_handler:
- return self.file_handler.close()
+ self.torrent_file.close(self)
- def copy(self):
- tf = TorrentFile(self.torrent_handler, self.file_path, self.torrent_file_path,
- self.size, self.chunk_size, self.offset, self.file_index)
- return tf
+ def seek(self, offset, whence=os.SEEK_SET):
+ self.position = offset
-class TorrentHandler(object):
- def __init__(self, torrent, torrent_id, core):
+class TorrentFile(object): # can be read from, knows about itself
+ def __init__(self, torrent, first_piece, last_piece, piece_size, offset, path, full_path, size, index):
self.torrent = torrent
- self.torrent_handle = torrent.handle
- self.torrent_id = torrent_id
- self.core = core
- self.priorities = [0] * len(torrent.get_status(['files'])['files'])
- self.torrent_files = defaultdict(list)
- self.priorities_increased = {}
- # need to blackhole all pieces not downloaded yet
- self.last_activity = datetime.now()
- self.update_chunk_priorities()
+ self.first_piece = first_piece
+ self.last_piece = last_piece
+ self.piece_size = piece_size
+ self.offset = offset
+ self.path = path
+ self.size = size
+ self.full_path = full_path
+ self.index = index
+
+ self.file_requested = False
+ self.do_shutdown = False
+ self.first_piece_end = self.piece_size * (self.first_piece + 1) - offset
+ self.waiting_pieces = {}
+ self.current_readers = []
+ self.registered_alert = False
+
+ self.alerts = component.get("AlertManager")
- def is_buffered(self, first_chunk, last_chunk):
- buffer_status = [x for x in self.torrent.status.pieces[first_chunk:last_chunk+1] if not x]
- logger.debug('Current buffer status for file (%s): %r' % (first_chunk, buffer_status))
- if buffer_status:
- return False
- else:
- return True
+ def open(self):
+ """
+ Returns a filelike object
+ """
+ if not self.registered_alert:
+ self.alerts.register_handler("read_piece_alert", self.on_alert_got_piece_data)
+ self.registered_alert = True
+ tfr = TorrentFileReader(self)
+ self.current_readers.append(tfr)
+ self.file_requested = False
+ return tfr
- def update_priorities(self):
- self.torrent.set_file_priorities(self.priorities)
+ def close(self, tfr):
+ self.current_readers.remove(tfr)
+ self.torrent.unprioritize_pieces(tfr)
- def schedule_chunk(self, chunk, distance):
- if chunk not in self.priorities_increased:
- logger.debug('Scheduled chunk %s at distance %s' % (chunk, distance))
-
- self.torrent_handle.piece_priority(chunk, (7 if distance <= 4 else 6))
- self.torrent_handle.set_piece_deadline(chunk, 700*(distance+1))
-
- self.priorities_increased[chunk] = True
+ def is_complete(self):
+ torrent_status = self.torrent.torrent.get_status(['file_progress', 'state'])
+ file_progress = torrent_status['file_progress']
+ return file_progress[self.index] == 1.0
- def update_chunk_priority(self, tfs):
- handled_heads = []
- for tf in sorted(tfs, key=lambda x:x.last_requested_chunk, reverse=True):
- if datetime.now() - tf.last_activity > HANDLERS_TIMEOUT:
- logger.debug('Torrentfile timed out')
- tf.close()
- continue
-
- if tf.last_requested_chunk is not None:
- if handled_heads:
- if tf.last_requested_chunk in handled_heads:
- continue
-
- already_queued = False
- for i in sorted(handled_heads):
- if i > tf.last_requested_chunk:
- if 0 not in self.torrent.status.pieces[tf.last_requested_chunk:i]:
- already_queued = True
- break
- if already_queued:
- continue
-
- offset = tf.last_requested_chunk + 1
-
- current_buffer_offset = 5
- for chunk in tf.end_chunks:
- if not self.torrent.status.pieces[chunk]:
- logger.info('End chunks are not downloaded, setting buffer offset differently')
- current_buffer_offset = 20
- break
-
- currently_downloading = set()
- for peer in self.torrent.handle.get_peer_info():
- if peer.downloading_piece_index != -1:
- currently_downloading.add(peer.downloading_piece_index)
-
- status_increase_count = 0
- current_buffer = 0
- found_buffer_end = False
- for chunk, chunk_status in enumerate(self.torrent.status.pieces[offset:tf.last_chunk+1], offset):
- if not chunk_status and chunk not in currently_downloading:
- if not found_buffer_end:
- logger.debug('Found buffer end at %s, have a buffer of %s' % (chunk, current_buffer))
- found_buffer_end = True
-
- if status_increase_count <= MIN_QUEUE_CHUNKS:
- self.schedule_chunk(chunk, chunk-offset)
- elif self.torrent_handle.piece_priority(chunk) == 0:
- self.torrent_handle.piece_priority(chunk, 1)
-
- status_increase_count += 1
- elif not found_buffer_end and chunk not in currently_downloading:
- current_buffer += 1
-
- if status_increase_count >= max(MIN_QUEUE_CHUNKS, current_buffer-current_buffer_offset):
- break
-
- handled_heads.append(tf.last_requested_chunk)
-
- return bool(handled_heads)
-
- def update_chunk_priorities(self): # TODO: check if torrent still exists
- start_time = time.time()
- file_progress = self.torrent.get_status(['file_progress'])['file_progress']
- incomplete_files = False
-
- for torrent_file_path, tfs in self.torrent_files.items():
- if not tfs:
- logger.debug('No heads left for %r' % torrent_file_path)
- del self.torrent_files[torrent_file_path]
- continue
-
- tf = tfs[0]
- if file_progress[tf.file_index] == 1.0:
- logger.info('%s is already complete, skipping' % torrent_file_path)
- continue
-
- if self.update_chunk_priority(tfs):
- self.last_activity = datetime.now()
-
- incomplete_files = True
-
- if not incomplete_files and self.core.config['reset_complete'] and not all(self.priorities):
- logger.info('We are not doing any file streamings, but not downloading all files, changing that.')
- self.priorities = [1] * len(self.priorities)
- self.update_priorities()
-
- if datetime.now() - self.last_activity > HANDLERS_TIMEOUT:
- logger.debug('Torrent handler idle, killing myself.')
- if self.torrent_id in self.core.torrent_handlers:
- del self.core.torrent_handlers[self.torrent_id]
+ def get_piece_info(self, tell):
+ return divmod((self.offset + tell), self.piece_size)
+
+ def on_alert_got_piece_data(self, alert):
+ torrent_id = str(alert.handle.info_hash())
+ if torrent_id != self.torrent.infohash:
return
- logger.debug('Took %s seconds to handle a loop' % (time.time() - start_time))
- reactor.callLater(1, self.update_chunk_priorities)
-
- def blackhole_all_pieces(self, first_chunk, last_chunk):
- for chunk in range(first_chunk, last_chunk+1):
- self.torrent_handle.piece_priority(chunk, 0)
+ logger.debug('Got piece data for piece %s' % alert.piece)
+ if alert.piece not in self.waiting_pieces:
+ logger.debug('Got data for piece %i, but no data needed for this piece?' % alert.piece)
+ return
+ # TODO: check piece size is not zero
+ self.waiting_pieces[alert.piece].callback(alert.buffer)
@defer.inlineCallbacks
- def get_file(self, filepath_or_index):
- status = self.torrent.get_status(['piece_length', 'files', 'file_priorities', 'file_progress', 'state', 'save_path'])
- pieces = self.torrent.status.pieces
- piece_length = status['piece_length']
- files = status['files']
+ def get_piece_data(self, piece):
+ logger.debug('Trying to get piece data for piece %s' % piece)
+ for reader in self.current_readers:
+ if reader.current_piece == piece:
+ defer.returnValue(reader.current_piece_data)
- for i, f, priority, progress in zip(range(len(files)), files, status['file_priorities'], status['file_progress']):
- if i == filepath_or_index or f['path'] == filepath_or_index:
- f['first_piece'] = f['offset'] / piece_length
- f['last_piece'] = (f['offset'] + f['size']) / piece_length
- f['pieces'] = pieces[f['first_piece']:f['last_piece']+1]
- f['priority'] = priority
- f['progress'] = progress
-
- break
- else:
- raise UnknownFileException()
+ if piece not in self.waiting_pieces:
+ self.waiting_pieces[piece] = defer.Deferred()
- fp = os.path.join(status['save_path'], f['path'])
-
- if progress == 1: # file is complete, no need to fire up all the torrent jazz
- defer.returnValue(static.File(fp))
-
- self.priorities = [0] * len(self.priorities)
- self.priorities[f['index']] = 3
- current_tfs = []
- for tfs in self.torrent_files.values():
- if not tfs:
- continue
- tf = tfs[0]
- self.priorities[tf.file_index] = 3
- current_tfs.append((tf, self.torrent_handle.piece_priorities()[tf.first_chunk:tf.last_chunk+1]))
-
- self.update_priorities()
-
- for tf, chunk_status in current_tfs:
- logger.info('Setting chunks to old status')
- for i, chunk in enumerate(chunk_status, tf.first_chunk):
- logger.debug('Setting status on chunk %s back to %s' % (i, chunk))
- self.torrent_handle.piece_priority(i, chunk)
-
- self.torrent.resume()
-
- percent_pieces = int(math.ceil((len(f['pieces']) / 100.0) * EXPECTED_PERCENT))
- size_pieces = int(min(math.ceil((EXPECTED_SIZE * 1.0) / piece_length), f['pieces']))
- expected_pieces = max(percent_pieces, size_pieces) # we need to download either 5% or 5MB of the file before allowing stream.
-
- tf = TorrentFile(self, fp, f['path'], f['size'], status['piece_length'], f['offset'], f['index'])
-
- if len(self.torrent_files[f['path']]) == 1:
- self.blackhole_all_pieces(tf.first_chunk, tf.last_chunk)
-
- tf.prepare_torrent(expected_pieces)
-
- for _ in range(300):
- if os.path.isfile(fp) and self.is_buffered(tf.first_chunk, tf.first_chunk+expected_pieces) and self.is_buffered(tf.last_chunk, tf.last_chunk):
- break
-
+ logger.debug('Waiting for %s' % piece)
+ self.torrent.schedule_piece(self, piece, 0)
+ while not self.torrent.torrent.handle.have_piece(piece):
+ if self.do_shutdown:
+ raise Exception()
+ logger.debug('Did not have piece %i, waiting' % piece)
+ self.torrent.unrelease()
yield sleep(1)
- defer.returnValue(tf)
+ self.torrent.torrent.handle.read_piece(piece)
+
+ data = yield self.waiting_pieces[piece]
+ if piece in self.waiting_pieces:
+ del self.waiting_pieces[piece]
+ logger.debug('Done waiting for piece %i, returning data' % piece)
+ defer.returnValue(data)
+
+ def shutdown(self):
+ #self.alerts.deregister_handler(self.on_alert_torrent_finished)
+ self.do_shutdown = True
-def sleep(seconds):
- d = defer.Deferred()
- reactor.callLater(seconds, d.callback, seconds)
- return d
+class Torrent(object):
+ def __init__(self, torrent_handler, infohash):
+ self.infohash = infohash
+ self.torrent = component.get("TorrentManager").torrents.get(infohash, None)
+ self.torrent_handler = torrent_handler
+
+ if not self.torrent:
+ raise UnknownTorrentException('%s is not a known infohash' % infohash)
+
+ self.torrent_files = None
+ self.priority_increased = defaultdict(set)
+ self.do_shutdown = False
+ self.torrent_released = False # set to True if all the files are set to download
+
+
+ self.populate_files()
+ self.file_priorities = [0] * len(self.torrent_files)
+
+ self.last_piece = self.torrent_files[-1].last_piece
+ self.torrent.handle.set_sequential_download(True)
+ reactor.callLater(0, self.update_piece_priority)
+ reactor.callLater(0, self.blackhole_all_pieces, 0, self.last_piece)
+
+ def populate_files(self):
+ self.torrent_files = []
+
+ status = self.torrent.get_status(['piece_length', 'files', 'save_path'])
+ piece_length = status['piece_length']
+ files = status['files']
+ save_path = status['save_path']
+
+ for f in files:
+ first_piece = f['offset'] / piece_length
+ last_piece = (f['offset'] + f['size']) / piece_length
+ full_path = os.path.join(save_path, f['path'])
+ self.torrent_files.append(TorrentFile(self, first_piece, last_piece, piece_length, f['offset'],
+ f['path'], full_path, f['size'], f['index']))
+
+ return files
+
+ def find_file(self, file_or_index=None):
+ best_file = None
+ biggest_file_size = 0
+
+ for i, f in enumerate(self.torrent_files):
+ if file_or_index is not None:
+ if i == file_or_index or f.path == file_or_index:
+ best_file = f
+ break
+ else:
+ if f.size > biggest_file_size:
+ best_file = f
+
+ return best_file
+
+ def get_file(self, file_or_index=None):
+ f = self.find_file(file_or_index)
+ if f is None:
+ raise UnknownFileException('Was unable to find %s' % file_or_index)
+
+ return f
+
+ def unprioritize_pieces(self, tfr):
+ logger.debug('Unprioritizing pieces for %s' % tfr)
+ currently_downloading = self.get_currently_downloading()
+
+ for piece, increased_by in self.priority_increased.items():
+ if tfr in increased_by:
+ increased_by.remove(tfr)
+ if not increased_by and piece not in currently_downloading and not self.torrent.status.pieces[piece]:
+ logger.debug('Unprioritizing piece %s' % piece)
+ self.torrent.handle.piece_priority(piece, 0)
+
+ def get_currently_downloading(self):
+ currently_downloading = set()
+ for peer in self.torrent.handle.get_peer_info():
+ if peer.downloading_piece_index != -1:
+ currently_downloading.add(peer.downloading_piece_index)
+
+ return currently_downloading
+
+ def blackhole_all_pieces(self, first_piece, last_piece):
+ currently_downloading = self.get_currently_downloading()
+
+ logger.debug('Blacklisting pieces from %i to %i skipping %r' % (first_piece, last_piece, currently_downloading))
+ for piece in range(first_piece, last_piece+1):
+ if piece not in currently_downloading and not self.torrent.status.pieces[piece]:
+ if piece in self.priority_increased:
+ continue
+ logger.debug('Setting piece priority %s to blacklist' % piece)
+ self.torrent.handle.piece_priority(piece, 0)
+
+ def unrelease(self):
+ if self.torrent_released:
+ logger.debug('Unreleasing %s' % self.infohash)
+ self.torrent_released = False
+ self.torrent.set_file_priorities(self.file_priorities)
+ self.blackhole_all_pieces(0, self.last_piece)
+
+ def get_torrent_file(self, file_or_index):
+ f = self.get_file(file_or_index)
+ f.file_requested = True
+
+ self.torrent.resume()
+
+ should_update_priorities = False
+ if self.file_priorities[f.index] == 0:
+ self.file_priorities[f.index] = 3
+ should_update_priorities = True
+
+ if self.torrent_released:
+ should_update_priorities = True
+
+ if should_update_priorities and not f.is_complete(): # Need to do this stuff on seek too
+ self.unrelease()
+
+ return f
+
+ def shutdown(self):
+ logger.info('Shutting down torrent %s' % self.infohash)
+ self.do_shutdown = True
+
+ for tf in self.torrent_files:
+ tf.shutdown()
+
+ def schedule_piece(self, torrent_file, piece, distance):
+ if torrent_file not in self.priority_increased[piece]:
+ if not self.priority_increased[piece]:
+ self.priority_increased[piece].add(torrent_file)
+
+ logger.debug('Scheduled piece %s at distance %s' % (piece, distance))
+
+ self.torrent.handle.piece_priority(piece, (7 if distance <= 4 else 6))
+ self.torrent.handle.set_piece_deadline(piece, 700*(distance+1))
+
+ self.priority_increased[piece].add(torrent_file)
+
+ def do_pieces_schedule(self, torrent_file, currently_downloading, from_piece):
+ logger.debug('Looking for stuff to do with pieces for file %s from piece %s' % (torrent_file, from_piece))
+
+ priority_increased = 0
+ chain_size = 0
+ download_chain_size = 0
+ end_of_chain = False
+
+ current_buffer_offset = 5
+ if self.torrent.status.pieces[torrent_file.last_piece]:
+ if torrent_file.first_piece != torrent_file.last_piece and self.torrent.status.pieces[torrent_file.last_piece-1] \
+ or torrent_file.first_piece == torrent_file.last_piece:
+ current_buffer_offset = 20
+
+ for piece, status in enumerate(self.torrent.status.pieces[from_piece:torrent_file.last_piece+1], from_piece):
+ if not end_of_chain:
+ if status:
+ chain_size += 1
+ elif piece in currently_downloading:
+ download_chain_size += 1
+
+ if not status and piece not in currently_downloading:
+ if not end_of_chain:
+ status_increase = max(11, chain_size-current_buffer_offset)
+ end_of_chain = True
+
+ priority_increased += 1
+ if priority_increased >= status_increase:
+ logger.debug('Done increasing priority for %i pieces' % status_increase)
+ break
+
+ self.schedule_piece(torrent_file, piece, piece-from_piece)
+ else:
+ logger.info('We are done with the rest of this chain, we might be able to increase others')
+ return True
+
+ return False
+
+ def update_piece_priority(self): # if all do_pieces_schedule returns true, allow all pices of file to be downloaded or whole torernt
+ if self.do_shutdown:
+ return
+
+ logger.debug('Updating piece priority for %s' % self.infohash)
+
+ currently_downloading = set()
+ for peer in self.torrent.handle.get_peer_info():
+ if peer.downloading_piece_index != -1:
+ currently_downloading.add(peer.downloading_piece_index)
+
+ all_heads_done = True
+ for f in self.torrent_files:
+ if not f.file_requested and not f.current_readers:
+ continue
+
+ logger.debug('Rescheduling file %s' % f.path)
+
+ if f.file_requested:
+ all_heads_done &= self.do_pieces_schedule(f, currently_downloading, f.first_piece)
+ self.schedule_piece(f, f.last_piece, 0)
+ if f.first_piece != f.last_piece:
+ self.schedule_piece(f, f.last_piece-1, 1)
+
+ for tfr in f.current_readers:
+ if tfr.waiting_for_piece is not None:
+ logger.debug('Scheduling based on waiting for piece %s' % tfr.waiting_for_piece)
+ all_heads_done &= self.do_pieces_schedule(f, currently_downloading, tfr.waiting_for_piece)
+ elif tfr.current_piece is not None:
+ logger.debug('Scheduling based on current piece %s' % tfr.current_piece)
+ all_heads_done &= self.do_pieces_schedule(f, currently_downloading, tfr.current_piece)
+
+ if all_heads_done and not self.torrent_released:
+ logger.debug('We are already done with all heads, figuring out what to do next')
+
+ if self.torrent_handler.config['reset_complete']:
+ self.torrent_released = True
+ logger.debug('Resetting all disabled files')
+ file_priorities = [(1 if fp == 0 else fp) for fp in self.file_priorities]
+ self.torrent.set_file_priorities(file_priorities)
+
+ if not all_heads_done and self.torrent_released:
+ logger.debug('Seems like the torrent was released too early')
+ self.unrelease()
+
+ reactor.callLater(0.3, self.update_piece_priority)
+
+class TorrentHandler(object):
+ def __init__(self, config):
+ self.torrents = {}
+ self.config = config
+
+ self.alerts = component.get("AlertManager")
+ self.alerts.register_handler("torrent_removed_alert", self.on_alert_torrent_removed)
+
+ def get_stream(self, infohash, file_or_index=None):
+ logger.info('Trying to stream infohash %s and file %s' % (infohash, file_or_index))
+ if infohash not in self.torrents:
+ self.torrents[infohash] = Torrent(self, infohash)
+
+ return self.torrents[infohash].get_torrent_file(file_or_index)
+
+ def on_alert_torrent_removed(self, alert):
+ try:
+ torrent_id = str(alert.handle.info_hash())
+ except (RuntimeError, KeyError):
+ logger.warning('Failed to handle on torrent remove alert')
+ return
+
+ if torrent_id not in self.torrents:
+ return
+
+ self.torrents[torrent_id].shutdown()
+ del self.torrents[torrent_id]
+
+ def shutdown(self):
+ logger.debug('Shutting down TorrentHandler')
+ self.alerts.deregister_handler(self.on_alert_torrent_removed)
+ for torrent in self.torrents.values():
+ torrent.shutdown()
class Core(CorePluginBase):
def enable(self):
@@ -470,9 +522,6 @@ class Core(CorePluginBase):
self.resource = Resource()
self.resource.putChild('file', self.fsr)
if self.config['allow_remote']:
- self.resource.putChild('add_torrent', AddTorrentResource(username=self.config['remote_username'],
- password=self.config['remote_password'],
- client=self))
self.resource.putChild('stream', StreamResource(username=self.config['remote_username'],
password=self.config['remote_password'],
client=self))
@@ -487,7 +536,7 @@ class Core(CorePluginBase):
except AttributeError:
logger.warning('Unable to exclude partial pieces')
- self.torrent_handlers = {}
+ self.torrent_handler = TorrentHandler(self.config)
try:
self.listening = reactor.listenTCP(self.config['port'], self.site, interface=self.config['ip'])
@@ -497,6 +546,7 @@ class Core(CorePluginBase):
@defer.inlineCallbacks
def disable(self):
self.site.stopFactory()
+ self.torrent_handler.shutdown()
yield self.listening.stopListening()
def update(self):
@@ -506,7 +556,6 @@ class Core(CorePluginBase):
@defer.inlineCallbacks
def set_config(self, config):
"""Sets the config dictionary"""
- do_reload = False
for key in config.keys():
self.config[key] = config[key]
self.config.save()
@@ -521,43 +570,36 @@ class Core(CorePluginBase):
@export
@defer.inlineCallbacks
- def add_torrent(self, torrent_data):
- core = component.get("Core")
- tid = yield core.add_torrent_file('file.torrent', torrent_data, {'add_paused': True})
+ def stream_torrent(self, infohash=None, url=None, filedump=None, filepath_or_index=None):
+ tor = component.get("TorrentManager").torrents.get(infohash, None)
- tor = component.get("TorrentManager").torrents.get(tid, None)
-
- state = tor.get_status(['files'])
- tor.set_file_priorities([0] * len(state['files']))
-
- defer.returnValue(tid)
-
- def get_torrent_handler(self, tid):
- if tid not in self.torrent_handlers:
- tor = component.get("TorrentManager").torrents.get(tid, None)
+ if tor is None:
+ logger.info('Did not find torrent, must add it')
- if tor is None:
- raise UnknownTorrentException()
+ if not filedump and url:
+ filedump = yield client.getPage(url)
- self.torrent_handlers[tid] = TorrentHandler(tor, tid, self)
+ if not filedump:
+ defer.returnValue({'status': 'error', 'message': 'unable to find torrent, provide infohash, url or filedump'})
+
+ torrent_info = lt.torrent_info(lt.bdecode(filedump))
+ infohash = str(torrent_info.info_hash())
- return self.torrent_handlers[tid]
-
- @export
- @defer.inlineCallbacks
- def stream_torrent(self, tid, filepath_or_index):
+ core = component.get("Core")
+ try:
+ yield core.add_torrent_file('file.torrent', filedump.encode('base64'), {'add_paused': True})
+ except:
+ defer.returnValue({'status': 'error', 'message': 'failed to add torrent'})
+
try:
- torrent_handler = yield self.get_torrent_handler(tid)
- except UnknownTorrentException: # torrent isn't added yet
- defer.returnValue({'status': 'error', 'message': 'torrent_not_found'})
-
- try:
- tf = yield torrent_handler.get_file(filepath_or_index)
- except UnknownFileException:
- defer.returnValue({'status': 'error', 'message': 'file_not_found'})
+ tf = self.torrent_handler.get_stream(infohash, filepath_or_index)
+ except UnknownTorrentException:
+ defer.returnValue({'status': 'error', 'message': 'unable to find torrent, probably failed to add it'})
defer.returnValue({
'status': 'success',
+ 'use_stream_urls': self.config['use_stream_urls'],
+ 'auto_open_stream_urls': self.config['auto_open_stream_urls'],
'url': 'http://%s:%s/file/%s/%s' % (self.config.config['ip'], self.config.config['port'],
- self.fsr.add_file(tf), urllib.quote_plus(os.path.basename(tf.path)))
+ self.fsr.add_file(tf), urllib.quote_plus(os.path.basename(tf.path)))
})
\ No newline at end of file
diff --git a/streaming/data/config.glade b/streaming/data/config.glade
index 5d0a43f..a061b44 100644
--- a/streaming/data/config.glade
+++ b/streaming/data/config.glade
@@ -33,6 +33,28 @@
+
+
+ True
+ 0
+ Use stream urls:
+
+
+ 2
+
+
+
+
+
+ True
+ 0
+ Auto-open stream urls:
+
+
+ 3
+
+
+
True
@@ -40,7 +62,7 @@
Reset "do not download" when streamed file is complete:
- 2
+ 4
@@ -51,7 +73,7 @@
Allow remote control:
- 3
+ 5
@@ -62,7 +84,7 @@
Remote username:
- 4
+ 6
@@ -73,7 +95,7 @@
Remote password:
- 5
+ 7
@@ -108,7 +130,7 @@
-
+
True
True
@@ -118,7 +140,7 @@
-
+
True
True
@@ -128,7 +150,7 @@
-
+
True
True
@@ -137,6 +159,26 @@
+
+
+ True
+ True
+
+
+ 5
+
+
+
+
+
+ True
+ True
+
+
+ 6
+
+
+
True
@@ -144,13 +186,13 @@
True
- 5
+ 7
- 1
+ 7
diff --git a/streaming/data/streaming.js b/streaming/data/streaming.js
index facb73a..0dae3a0 100644
--- a/streaming/data/streaming.js
+++ b/streaming/data/streaming.js
@@ -68,6 +68,18 @@ PreferencePage = Ext.extend(Ext.Panel, {
fieldLabel: 'IP'
}));
+ om.bind('use_stream_urls', fieldset.add({
+ xtype: 'checkbox',
+ name: 'use_stream_urls',
+ fieldLabel: 'Use StreamProtocol urls',
+ }));
+
+ om.bind('auto_open_stream_urls', fieldset.add({
+ xtype: 'checkbox',
+ name: 'auto_open_stream_urls',
+ fieldLabel: 'AutoOpen StreamProtocol urls',
+ }));
+
om.bind('reset_complete', fieldset.add({
xtype: 'checkbox',
name: 'reset_complete',
@@ -150,10 +162,19 @@ StreamingPlugin = Ext.extend(Deluge.Plugin, {
var fileIndex = nodes[0].attributes.fileIndex;
var tid = files.torrentId;
if (fileIndex >= 0) {
- deluge.client.streaming.stream_torrent(tid, fileIndex, {
+ deluge.client.streaming.stream_torrent(tid, null, null, fileIndex, {
success: function (result) {
+ console.log('Got result', result);
if (result.status == 'success') {
- Ext.Msg.alert('Stream ready', 'URL for stream: ' + result.url + '');
+ var url = result.url;
+ if (result.use_stream_urls) {
+ url = 'stream+' + url;
+ if (result.auto_open_stream_urls) {
+ window.location.assign(url);
+ return;
+ }
+ }
+ Ext.Msg.alert('Stream ready', 'URL for stream: ' + url + '');
} else {
Ext.Msg.alert('Stream failed', 'Error message: ' + result.message);
}
diff --git a/streaming/filelike.py b/streaming/filelike.py
index acef851..ea4ff65 100644
--- a/streaming/filelike.py
+++ b/streaming/filelike.py
@@ -68,6 +68,7 @@ class MultipleRangeStaticProducer(static.MultipleRangeStaticProducer):
if done:
self.request.unregisterProducer()
self.request.finish()
+ self.stopProducing()
self.request = None
class FilelikeObjectResource(static.File):
@@ -141,6 +142,7 @@ class FilelikeObjectResource(static.File):
producer = self.makeProducer(request, self.fileObject)
if request.method == 'HEAD':
+ self.fileObject.close()
return ''
producer.start()
diff --git a/streaming/gtkui.py b/streaming/gtkui.py
index 38008c0..cdf9ce5 100644
--- a/streaming/gtkui.py
+++ b/streaming/gtkui.py
@@ -37,7 +37,9 @@
# statement from all source files in the program, then also delete it here.
#
+import json
import gtk
+import webbrowser
from deluge.log import LOG as log
from deluge.ui.client import client
@@ -46,8 +48,37 @@ from deluge.plugins.pluginbase import GtkPluginBase
import deluge.component as component
import deluge.common
+from twisted.internet import reactor, defer, threads
+from twisted.web import server, resource
+
from common import get_resource
+class LocalAddResource(resource.Resource):
+ gtkui = None
+ isLeaf = True
+
+ def __init__(self, gtkui):
+ self.gtkui = gtkui
+ resource.Resource.__init__(self)
+
+ def render_GET(self, request):
+ useragent = request.getHeader('User-Agent')
+ if 'Deluge-Streamer' not in useragent:
+ request.setResponseCode(401)
+ return 'Unauthorized'
+
+ torrent_url = request.args.get('url', None)
+ if not torrent_url:
+ return json.dumps({'status': 'error', 'message': 'missing url in request'})
+
+ torrent_file = request.args.get('file', None)
+ if torrent_file:
+ torrent_file = torrent_file[0]
+
+ client.streaming.stream_torrent(url=torrent_url[0], filepath_or_index=torrent_file).addCallback(self.gtkui.stream_ready)
+
+ return json.dumps({'status': 'ok', 'message': 'queued'})
+
class GtkUI(GtkPluginBase):
def enable(self):
self.glade = gtk.glade.XML(get_resource("config.glade"))
@@ -67,7 +98,12 @@ class GtkUI(GtkPluginBase):
self.sep.show()
self.item.show()
+
+ self.resource = LocalAddResource(self)
+ self.site = server.Site(self.resource)
+ self.listening = reactor.listenTCP(40747, self.site, interface='127.0.0.1')
+ @defer.inlineCallbacks
def disable(self):
component.get("Preferences").remove_page("Streaming")
component.get("PluginManager").deregister_hook("on_apply_prefs", self.on_apply_prefs)
@@ -77,12 +113,17 @@ class GtkUI(GtkPluginBase):
file_menu.remove(self.item)
file_menu.remove(self.sep)
+
+ self.site.stopFactory()
+ yield self.listening.stopListening()
def on_apply_prefs(self):
log.debug("applying prefs for Streaming")
config = {
"ip": self.glade.get_widget("input_ip").get_text(),
"port": int(self.glade.get_widget("input_port").get_text()),
+ "use_stream_urls": self.glade.get_widget("input_use_stream_urls").get_active(),
+ "auto_open_stream_urls": self.glade.get_widget("input_auto_open_stream_urls").get_active(),
"allow_remote": self.glade.get_widget("input_allow_remote").get_active(),
"reset_complete": self.glade.get_widget("input_reset_complete").get_active(),
"remote_username": self.glade.get_widget("input_remote_username").get_text(),
@@ -98,11 +139,26 @@ class GtkUI(GtkPluginBase):
"callback for on show_prefs"
self.glade.get_widget("input_ip").set_text(config["ip"])
self.glade.get_widget("input_port").set_text(str(config["port"]))
+ self.glade.get_widget("input_use_stream_urls").set_active(config["use_stream_urls"])
+ self.glade.get_widget("input_auto_open_stream_urls").set_active(config["auto_open_stream_urls"])
self.glade.get_widget("input_allow_remote").set_active(config["allow_remote"])
self.glade.get_widget("input_reset_complete").set_active(config["reset_complete"])
self.glade.get_widget("input_remote_username").set_text(config["remote_username"])
self.glade.get_widget("input_remote_password").set_text(config["remote_password"])
+ def stream_ready(self, result):
+ if result['status'] == 'success':
+ if result.get('use_stream_urls', False):
+ url = "stream+%s" % result['url']
+ if result.get('auto_open_stream_urls', False):
+ threads.deferToThread(webbrowser.open, url)
+ else:
+ dialogs.InformationDialog('Stream ready', 'Click here to open it' % url).run()
+ else:
+ dialogs.ErrorDialog('Stream ready', 'Copy the link into a media player', details=result['url']).run()
+ else:
+ dialogs.ErrorDialog('Stream failed', 'Was unable to prepare the stream', details=result).run()
+
def on_menuitem_stream(self, data=None):
torrent_id = component.get("TorrentView").get_selected_torrents()[0]
@@ -113,13 +169,7 @@ class GtkUI(GtkPluginBase):
for path in paths:
selected.append(ft.treestore.get_iter(path))
- def stream_ready(result):
- if result['status'] == 'success':
- dialogs.ErrorDialog('Stream ready', 'Copy the link into a media player', details=result['url']).run()
- else:
- dialogs.ErrorDialog('Stream failed', 'Was unable to prepare the stream', details=result).run()
-
for select in selected:
path = ft.get_file_path(select)
- client.streaming.stream_torrent(torrent_id, path).addCallback(stream_ready)
- break
\ No newline at end of file
+ client.streaming.stream_torrent(infohash=torrent_id, filepath_or_index=path).addCallback(self.stream_ready)
+ break