diff --git a/streaming/core.py b/streaming/core.py index 023f90d..89fb0a9 100644 --- a/streaming/core.py +++ b/streaming/core.py @@ -44,6 +44,8 @@ import math import os import urllib +from collections import defaultdict + from twisted.internet import reactor, defer, task from twisted.python import randbytes from twisted.web import server, resource, static, http @@ -67,7 +69,15 @@ DEFAULT_PREFS = { from .filelike import FilelikeObjectResource -MAX_QUEUE_CHUNKS = 12 +MIN_QUEUE_CHUNKS = 6 +EXPECTED_PERCENT = 0.3 +EXPECTED_SIZE = 5*1024*1024 + +class UnknownTorrentException(Exception): + pass + +class UnknownFileException(Exception): + pass class FileServeResource(resource.Resource): isLeaf = True @@ -88,7 +98,7 @@ class FileServeResource(resource.Resource): def render_GET(self, request): key = request.path.split('/')[2] if key not in self.file_mapping: - return resource.NoResource().render() + return resource.NoResource().render(request) tf = self.file_mapping[key].copy() tf.open() @@ -138,9 +148,8 @@ class StreamResource(Resource): defer.returnValue(json.dumps(result)) class TorrentFile(object): - def __init__(self, torrent, file_path, size, chunk_size, offset): - self.torrent = torrent - self.torrent_handle = torrent.handle + def __init__(self, torrent_handler, file_path, size, chunk_size, offset): + self.torrent_handler = torrent_handler self.file_path = file_path self.first_chunk = offset / chunk_size self.last_chunk = (offset + size) / chunk_size @@ -149,13 +158,16 @@ class TorrentFile(object): self.size = size self.last_requested_chunk = self.first_chunk self.is_closed = False - - self.priorities_increased = {} + self.is_active = False self.first_chunk_end = self.chunk_size * (self.first_chunk + 1) - offset + + self.torrent_handler.torrent_files[self.file_path].append(self) def open(self): - self.update_chunk_priority() + if self.is_closed: + raise IOError('Unable to reopen file') + self.file_handler = open(self.file_path, 'rb') def get_chunk(self, tell): @@ -167,17 +179,39 @@ class TorrentFile(object): return self.first_chunk + offset, self.first_chunk_end + (offset * self.chunk_size) + def prepare_torrent(self, buffer_pieces): + self.torrent_handler.schedule_chunk(self.first_chunk, 0) + self.torrent_handler.schedule_chunk(self.last_chunk, 1) + + for chunk, chunk_status in enumerate(self.torrent_handler.torrent.status.pieces[self.first_chunk:self.first_chunk+buffer_pieces+1], self.first_chunk): + self.torrent_handler.schedule_chunk(chunk, chunk-self.first_chunk) + + @defer.inlineCallbacks + def read(self, size=1024): + self.is_active = True + + tell = self.tell() + chunk, end_of_chunk = self.get_chunk(tell) + self.last_requested_chunk = chunk + + logger.debug('waiting for chunk %s, %s, %s' % (chunk, size, tell)) + yield self.wait_chunk_complete(chunk) + logger.debug('done waiting %s, %s, %s' % (chunk, size, tell)) + + defer.returnValue(self.file_handler.read(min(end_of_chunk-tell, size))) + def wait_chunk_complete(self, chunk): d = defer.Deferred() def check_if_done(): - if self.torrent.status.pieces[chunk]: + if self.torrent_handler.torrent.status.pieces[chunk]: return d.callback(True) - self.set_prio(chunk, 7) + self.torrent_handler.schedule_chunk(chunk, 0) if self.is_closed: - return d.errback(None) + logger.debug('The file closed, shutting down torrent') + return reactor.callLater(1.0, check_if_done) @@ -185,59 +219,6 @@ class TorrentFile(object): return d - def set_prio(self, chunk, prio): - if self.priorities_increased.get(chunk, 0) < prio: - self.torrent_handle.piece_priority(chunk, prio) - self.priorities_increased[chunk] = prio - - if prio == 7: - self.torrent_handle.set_piece_deadline(chunk, 100) - - def prepare_torrent(self, buffer_pieces): - self.set_prio(self.first_chunk, 7) - self.set_prio(self.last_chunk, 7) - - for chunk, chunk_status in enumerate(self.torrent.status.pieces[self.first_chunk:self.first_chunk+buffer_pieces+1], self.first_chunk): - self.set_prio(chunk, 7) - - self.update_chunk_priority() - - def is_buffered(self, expected_pieces): - buffer_status = [x for x in self.torrent.status.pieces[self.first_chunk:self.first_chunk+expected_pieces+1] if not x] - logger.debug('Current buffer status: %r' % buffer_status) - if buffer_status: - return False - else: - return True - - def update_chunk_priority(self): # no need to do this when the file is complete - if self.is_closed: - return - - if self.last_requested_chunk is not None: - offset = self.last_requested_chunk + 1 - - status_increase_count = 0 - for chunk, chunk_status in enumerate(self.torrent.status.pieces[offset:self.last_chunk+1], offset): - if not chunk_status: - self.set_prio(chunk, 7) - status_increase_count += 1 - - if status_increase_count > MAX_QUEUE_CHUNKS: - break - - reactor.callLater(4, self.update_chunk_priority) - - @defer.inlineCallbacks - def read(self, size=1024): - tell = self.tell() - chunk, end_of_chunk = self.get_chunk(tell) - self.last_requested_chunk = chunk - logger.debug('waiting for chunk %s, %s, %s' % (chunk, size, tell)) - yield self.wait_chunk_complete(chunk) - logger.debug('done waiting %s, %s, %s' % (chunk, size, tell)) - defer.returnValue(self.file_handler.read(min(end_of_chunk-tell, size))) - def seek(self, offset, whence=os.SEEK_SET): return self.file_handler.seek(offset, whence) @@ -245,15 +226,145 @@ class TorrentFile(object): return self.file_handler.tell() def close(self): + if not self.is_closed and self in self.torrent_handler.torrent_files[self.file_path]: + self.torrent_handler.torrent_files[self.file_path].remove(self) self.is_closed = True return self.file_handler.close() def copy(self): - tf = TorrentFile(self.torrent, self.file_path, self.size, self.chunk_size, self.offset) - tf.priorities_increased = self.priorities_increased - + tf = TorrentFile(self.torrent_handler, self.file_path, self.size, self.chunk_size, self.offset) return tf +class TorrentHandler(object): + def __init__(self, torrent): + self.torrent = torrent + self.torrent_handle = torrent.handle + self.priorities = [0] * len(torrent.get_status(['files'])['files']) # TODO: get current priorities and use those instead + self.torrent_files = defaultdict(list) + self.priorities_increased = {} + # need to blackhole all pieces not downloaded yet + self.update_chunk_priorities() + + def is_buffered(self, first_chunk, last_chunk): + buffer_status = [x for x in self.torrent.status.pieces[first_chunk:last_chunk+1] if not x] + logger.debug('Current buffer status for file (%s): %r' % (first_chunk, buffer_status)) + if buffer_status: + return False + else: + return True + + def update_priorities(self): + self.torrent.set_file_priorities(self.priorities) + + def schedule_chunk(self, chunk, distance): + if chunk not in self.priorities_increased: + logger.debug('Scheduled chunk %s at distance %s' % (chunk, distance)) + + self.torrent_handle.piece_priority(chunk, (7 if distance <= 4 else 6)) + self.torrent_handle.set_piece_deadline(chunk, 700*(distance+1)) + + self.priorities_increased[chunk] = True + + def update_chunk_priority(self, tfs): + handled_heads = [] + for tf in sorted(tfs, key=lambda x:x.last_requested_chunk, reverse=True): + if tf.is_closed: + continue + + if tf.last_requested_chunk is not None: + if handled_heads: + if tf.last_requested_chunk in handled_heads: + continue + + already_queued = False + for i in sorted(handled_heads): + if i > tf.last_requested_chunk: + if 0 not in self.torrent.status.pieces[tf.last_requested_chunk:i]: + already_queued = True + break + if already_queued: + continue + + offset = tf.last_requested_chunk + 1 + + status_increase_count = 0 + current_buffer = 0 + found_buffer_end = False + for chunk, chunk_status in enumerate(self.torrent.status.pieces[offset:tf.last_chunk+1], offset): + if not chunk_status: + if not found_buffer_end: + logger.debug('Found buffer end at %s, have a buffer of %s' % (chunk, current_buffer)) + found_buffer_end = True + + if status_increase_count <= MIN_QUEUE_CHUNKS: + self.schedule_chunk(chunk, chunk-offset) + elif self.torrent_handle.piece_priority(chunk) == 0: + self.torrent_handle.piece_priority(chunk, 1) + + status_increase_count += 1 + elif not found_buffer_end: + current_buffer += 1 + + if status_increase_count >= max(MIN_QUEUE_CHUNKS, current_buffer-5): + break + + handled_heads.append(tf.last_requested_chunk) + + def update_chunk_priorities(self): + for tfs in self.torrent_files.values(): + self.update_chunk_priority(tfs) + + reactor.callLater(2, self.update_chunk_priorities) + + def blackhole_all_pieces(self, first_chunk, last_chunk): + for chunk in range(first_chunk, last_chunk+1): + self.torrent_handle.piece_priority(chunk, 0) + + @defer.inlineCallbacks + def get_file(self, filepath): + status = self.torrent.get_status(['piece_length', 'files', 'file_priorities', 'file_progress', 'state', 'save_path']) + pieces = self.torrent.status.pieces + piece_length = status['piece_length'] + files = status['files'] + + for f, priority, progress in zip(files, status['file_priorities'], status['file_progress']): + if f['path'] == filepath: + f['first_piece'] = f['offset'] / piece_length + f['last_piece'] = (f['offset'] + f['size']) / piece_length + f['pieces'] = pieces[f['first_piece']:f['last_piece']+1] + f['priority'] = priority + f['progress'] = progress + + break + else: + raise UnknownFileException() + + self.priorities[f['index']] = 3 + self.update_priorities() + + self.torrent.resume() + + percent_pieces = int(math.ceil((len(f['pieces']) / 100.0) * EXPECTED_PERCENT)) + size_pieces = int(min(math.ceil((EXPECTED_SIZE * 1.0) / piece_length), f['pieces'])) + expected_pieces = max(percent_pieces, size_pieces) # we need to download either 5% or 5MB of the file before allowing stream. + + fp = os.path.join(status['save_path'], f['path']) + + tf = TorrentFile(self, fp, f['size'], status['piece_length'], f['offset']) + + if len(self.torrent_files[fp]) == 1: + self.blackhole_all_pieces(tf.first_chunk, tf.last_chunk) + + tf.prepare_torrent(expected_pieces) + + for _ in range(300): + if os.path.isfile(fp) and self.is_buffered(tf.first_chunk, tf.first_chunk+expected_pieces): + break + + yield sleep(1) + + defer.returnValue(tf) + def sleep(seconds): d = defer.Deferred() reactor.callLater(seconds, d.callback, seconds) @@ -272,6 +383,13 @@ class Core(CorePluginBase): self.site = server.Site(self.resource) self.listening = reactor.listenTCP(self.config.config['port'], self.site, interface=self.config.config['ip']) + + session = component.get("Core").session + settings = session.get_settings() + settings['prioritize_partial_pieces'] = True + session.set_settings(settings) + + self.torrent_handlers = {} @defer.inlineCallbacks def disable(self): @@ -311,59 +429,32 @@ class Core(CorePluginBase): defer.returnValue(tid) + def get_torrent_handler(self, tid): + if tid not in self.torrent_handlers: + tor = component.get("TorrentManager").torrents.get(tid, None) + + if tor is None: + raise UnknownTorrentException() + + self.torrent_handlers[tid] = TorrentHandler(tor) + + return self.torrent_handlers[tid] + @export @defer.inlineCallbacks def stream_torrent(self, tid, filepath): - tor = component.get("TorrentManager").torrents.get(tid, None) - - if tor is None: # torrent isn't downloaded yet + try: + torrent_handler = yield self.get_torrent_handler(tid) + except UnknownTorrentException: # torrent isn't added yet defer.returnValue({'status': 'error', 'message': 'torrent_not_found'}) - status = tor.get_status(['piece_length', 'files', 'file_priorities', 'file_progress', 'state', 'save_path']) - pieces = tor.status.pieces - piece_length = status['piece_length'] - files = status['files'] - - for f, priority, progress in zip(files, status['file_priorities'], status['file_progress']): - f['first_piece'] = f['offset'] / piece_length - f['last_piece'] = (f['offset'] + f['size']) / piece_length - f['pieces'] = pieces[f['first_piece']:f['last_piece']+1] - f['priority'] = priority - f['progress'] = progress - - f = [f for f in files if f['path'] == filepath] - - if not f: # file not found in torrent + try: + tf = yield torrent_handler.get_file(filepath) + except UnknownFileException: defer.returnValue({'status': 'error', 'message': 'file_not_found'}) - f = f[0] - - priorities = [0] * len(tor.get_status(['files'])['files']) - priorities[f['index']] = 1 - tor.set_file_priorities(priorities) - - tor.resume() - - EXPECTED_PERCENT = 2.0 - EXPECTED_SIZE = 5*1024*1024 - - percent_pieces = int(math.ceil((len(f['pieces']) / 100.0) * EXPECTED_PERCENT)) - size_pieces = int(min(math.ceil((EXPECTED_SIZE * 1.0) / piece_length), f['pieces'])) - expected_pieces = max(percent_pieces, size_pieces) # we need to download either 5% or 5MB of the file before allowing stream. - - fp = os.path.join(status['save_path'], f['path']) - - tf = TorrentFile(tor, fp, f['size'], status['piece_length'], f['offset']) - tf.prepare_torrent(expected_pieces) - - for _ in range(300): - if os.path.isfile(fp) and tf.is_buffered(expected_pieces): - break - - yield sleep(1) defer.returnValue({ 'status': 'success', 'url': 'http://%s:%s/file/%s/%s' % (self.config.config['ip'], self.config.config['port'], - self.fsr.add_file(tf), - urllib.quote(f['path'].split('/')[-1])) + self.fsr.add_file(tf), os.path.basename(tf.file_path)) }) \ No newline at end of file