improved buffering algorithm and fixed an array of minor bugs

This commit is contained in:
JohnDoee
2015-02-03 12:33:06 +01:00
parent 85a81df9f5
commit da45f41012

View File

@@ -44,6 +44,8 @@ import math
import os
import urllib
from collections import defaultdict
from twisted.internet import reactor, defer, task
from twisted.python import randbytes
from twisted.web import server, resource, static, http
@@ -67,7 +69,15 @@ DEFAULT_PREFS = {
from .filelike import FilelikeObjectResource
MAX_QUEUE_CHUNKS = 12
MIN_QUEUE_CHUNKS = 6
EXPECTED_PERCENT = 0.3
EXPECTED_SIZE = 5*1024*1024
class UnknownTorrentException(Exception):
pass
class UnknownFileException(Exception):
pass
class FileServeResource(resource.Resource):
isLeaf = True
@@ -88,7 +98,7 @@ class FileServeResource(resource.Resource):
def render_GET(self, request):
key = request.path.split('/')[2]
if key not in self.file_mapping:
return resource.NoResource().render()
return resource.NoResource().render(request)
tf = self.file_mapping[key].copy()
tf.open()
@@ -138,9 +148,8 @@ class StreamResource(Resource):
defer.returnValue(json.dumps(result))
class TorrentFile(object):
def __init__(self, torrent, file_path, size, chunk_size, offset):
self.torrent = torrent
self.torrent_handle = torrent.handle
def __init__(self, torrent_handler, file_path, size, chunk_size, offset):
self.torrent_handler = torrent_handler
self.file_path = file_path
self.first_chunk = offset / chunk_size
self.last_chunk = (offset + size) / chunk_size
@@ -149,13 +158,16 @@ class TorrentFile(object):
self.size = size
self.last_requested_chunk = self.first_chunk
self.is_closed = False
self.priorities_increased = {}
self.is_active = False
self.first_chunk_end = self.chunk_size * (self.first_chunk + 1) - offset
self.torrent_handler.torrent_files[self.file_path].append(self)
def open(self):
self.update_chunk_priority()
if self.is_closed:
raise IOError('Unable to reopen file')
self.file_handler = open(self.file_path, 'rb')
def get_chunk(self, tell):
@@ -167,17 +179,39 @@ class TorrentFile(object):
return self.first_chunk + offset, self.first_chunk_end + (offset * self.chunk_size)
def prepare_torrent(self, buffer_pieces):
self.torrent_handler.schedule_chunk(self.first_chunk, 0)
self.torrent_handler.schedule_chunk(self.last_chunk, 1)
for chunk, chunk_status in enumerate(self.torrent_handler.torrent.status.pieces[self.first_chunk:self.first_chunk+buffer_pieces+1], self.first_chunk):
self.torrent_handler.schedule_chunk(chunk, chunk-self.first_chunk)
@defer.inlineCallbacks
def read(self, size=1024):
self.is_active = True
tell = self.tell()
chunk, end_of_chunk = self.get_chunk(tell)
self.last_requested_chunk = chunk
logger.debug('waiting for chunk %s, %s, %s' % (chunk, size, tell))
yield self.wait_chunk_complete(chunk)
logger.debug('done waiting %s, %s, %s' % (chunk, size, tell))
defer.returnValue(self.file_handler.read(min(end_of_chunk-tell, size)))
def wait_chunk_complete(self, chunk):
d = defer.Deferred()
def check_if_done():
if self.torrent.status.pieces[chunk]:
if self.torrent_handler.torrent.status.pieces[chunk]:
return d.callback(True)
self.set_prio(chunk, 7)
self.torrent_handler.schedule_chunk(chunk, 0)
if self.is_closed:
return d.errback(None)
logger.debug('The file closed, shutting down torrent')
return
reactor.callLater(1.0, check_if_done)
@@ -185,59 +219,6 @@ class TorrentFile(object):
return d
def set_prio(self, chunk, prio):
if self.priorities_increased.get(chunk, 0) < prio:
self.torrent_handle.piece_priority(chunk, prio)
self.priorities_increased[chunk] = prio
if prio == 7:
self.torrent_handle.set_piece_deadline(chunk, 100)
def prepare_torrent(self, buffer_pieces):
self.set_prio(self.first_chunk, 7)
self.set_prio(self.last_chunk, 7)
for chunk, chunk_status in enumerate(self.torrent.status.pieces[self.first_chunk:self.first_chunk+buffer_pieces+1], self.first_chunk):
self.set_prio(chunk, 7)
self.update_chunk_priority()
def is_buffered(self, expected_pieces):
buffer_status = [x for x in self.torrent.status.pieces[self.first_chunk:self.first_chunk+expected_pieces+1] if not x]
logger.debug('Current buffer status: %r' % buffer_status)
if buffer_status:
return False
else:
return True
def update_chunk_priority(self): # no need to do this when the file is complete
if self.is_closed:
return
if self.last_requested_chunk is not None:
offset = self.last_requested_chunk + 1
status_increase_count = 0
for chunk, chunk_status in enumerate(self.torrent.status.pieces[offset:self.last_chunk+1], offset):
if not chunk_status:
self.set_prio(chunk, 7)
status_increase_count += 1
if status_increase_count > MAX_QUEUE_CHUNKS:
break
reactor.callLater(4, self.update_chunk_priority)
@defer.inlineCallbacks
def read(self, size=1024):
tell = self.tell()
chunk, end_of_chunk = self.get_chunk(tell)
self.last_requested_chunk = chunk
logger.debug('waiting for chunk %s, %s, %s' % (chunk, size, tell))
yield self.wait_chunk_complete(chunk)
logger.debug('done waiting %s, %s, %s' % (chunk, size, tell))
defer.returnValue(self.file_handler.read(min(end_of_chunk-tell, size)))
def seek(self, offset, whence=os.SEEK_SET):
return self.file_handler.seek(offset, whence)
@@ -245,15 +226,145 @@ class TorrentFile(object):
return self.file_handler.tell()
def close(self):
if not self.is_closed and self in self.torrent_handler.torrent_files[self.file_path]:
self.torrent_handler.torrent_files[self.file_path].remove(self)
self.is_closed = True
return self.file_handler.close()
def copy(self):
tf = TorrentFile(self.torrent, self.file_path, self.size, self.chunk_size, self.offset)
tf.priorities_increased = self.priorities_increased
tf = TorrentFile(self.torrent_handler, self.file_path, self.size, self.chunk_size, self.offset)
return tf
class TorrentHandler(object):
def __init__(self, torrent):
self.torrent = torrent
self.torrent_handle = torrent.handle
self.priorities = [0] * len(torrent.get_status(['files'])['files']) # TODO: get current priorities and use those instead
self.torrent_files = defaultdict(list)
self.priorities_increased = {}
# need to blackhole all pieces not downloaded yet
self.update_chunk_priorities()
def is_buffered(self, first_chunk, last_chunk):
buffer_status = [x for x in self.torrent.status.pieces[first_chunk:last_chunk+1] if not x]
logger.debug('Current buffer status for file (%s): %r' % (first_chunk, buffer_status))
if buffer_status:
return False
else:
return True
def update_priorities(self):
self.torrent.set_file_priorities(self.priorities)
def schedule_chunk(self, chunk, distance):
if chunk not in self.priorities_increased:
logger.debug('Scheduled chunk %s at distance %s' % (chunk, distance))
self.torrent_handle.piece_priority(chunk, (7 if distance <= 4 else 6))
self.torrent_handle.set_piece_deadline(chunk, 700*(distance+1))
self.priorities_increased[chunk] = True
def update_chunk_priority(self, tfs):
handled_heads = []
for tf in sorted(tfs, key=lambda x:x.last_requested_chunk, reverse=True):
if tf.is_closed:
continue
if tf.last_requested_chunk is not None:
if handled_heads:
if tf.last_requested_chunk in handled_heads:
continue
already_queued = False
for i in sorted(handled_heads):
if i > tf.last_requested_chunk:
if 0 not in self.torrent.status.pieces[tf.last_requested_chunk:i]:
already_queued = True
break
if already_queued:
continue
offset = tf.last_requested_chunk + 1
status_increase_count = 0
current_buffer = 0
found_buffer_end = False
for chunk, chunk_status in enumerate(self.torrent.status.pieces[offset:tf.last_chunk+1], offset):
if not chunk_status:
if not found_buffer_end:
logger.debug('Found buffer end at %s, have a buffer of %s' % (chunk, current_buffer))
found_buffer_end = True
if status_increase_count <= MIN_QUEUE_CHUNKS:
self.schedule_chunk(chunk, chunk-offset)
elif self.torrent_handle.piece_priority(chunk) == 0:
self.torrent_handle.piece_priority(chunk, 1)
status_increase_count += 1
elif not found_buffer_end:
current_buffer += 1
if status_increase_count >= max(MIN_QUEUE_CHUNKS, current_buffer-5):
break
handled_heads.append(tf.last_requested_chunk)
def update_chunk_priorities(self):
for tfs in self.torrent_files.values():
self.update_chunk_priority(tfs)
reactor.callLater(2, self.update_chunk_priorities)
def blackhole_all_pieces(self, first_chunk, last_chunk):
for chunk in range(first_chunk, last_chunk+1):
self.torrent_handle.piece_priority(chunk, 0)
@defer.inlineCallbacks
def get_file(self, filepath):
status = self.torrent.get_status(['piece_length', 'files', 'file_priorities', 'file_progress', 'state', 'save_path'])
pieces = self.torrent.status.pieces
piece_length = status['piece_length']
files = status['files']
for f, priority, progress in zip(files, status['file_priorities'], status['file_progress']):
if f['path'] == filepath:
f['first_piece'] = f['offset'] / piece_length
f['last_piece'] = (f['offset'] + f['size']) / piece_length
f['pieces'] = pieces[f['first_piece']:f['last_piece']+1]
f['priority'] = priority
f['progress'] = progress
break
else:
raise UnknownFileException()
self.priorities[f['index']] = 3
self.update_priorities()
self.torrent.resume()
percent_pieces = int(math.ceil((len(f['pieces']) / 100.0) * EXPECTED_PERCENT))
size_pieces = int(min(math.ceil((EXPECTED_SIZE * 1.0) / piece_length), f['pieces']))
expected_pieces = max(percent_pieces, size_pieces) # we need to download either 5% or 5MB of the file before allowing stream.
fp = os.path.join(status['save_path'], f['path'])
tf = TorrentFile(self, fp, f['size'], status['piece_length'], f['offset'])
if len(self.torrent_files[fp]) == 1:
self.blackhole_all_pieces(tf.first_chunk, tf.last_chunk)
tf.prepare_torrent(expected_pieces)
for _ in range(300):
if os.path.isfile(fp) and self.is_buffered(tf.first_chunk, tf.first_chunk+expected_pieces):
break
yield sleep(1)
defer.returnValue(tf)
def sleep(seconds):
d = defer.Deferred()
reactor.callLater(seconds, d.callback, seconds)
@@ -272,6 +383,13 @@ class Core(CorePluginBase):
self.site = server.Site(self.resource)
self.listening = reactor.listenTCP(self.config.config['port'], self.site, interface=self.config.config['ip'])
session = component.get("Core").session
settings = session.get_settings()
settings['prioritize_partial_pieces'] = True
session.set_settings(settings)
self.torrent_handlers = {}
@defer.inlineCallbacks
def disable(self):
@@ -311,59 +429,32 @@ class Core(CorePluginBase):
defer.returnValue(tid)
def get_torrent_handler(self, tid):
if tid not in self.torrent_handlers:
tor = component.get("TorrentManager").torrents.get(tid, None)
if tor is None:
raise UnknownTorrentException()
self.torrent_handlers[tid] = TorrentHandler(tor)
return self.torrent_handlers[tid]
@export
@defer.inlineCallbacks
def stream_torrent(self, tid, filepath):
tor = component.get("TorrentManager").torrents.get(tid, None)
if tor is None: # torrent isn't downloaded yet
try:
torrent_handler = yield self.get_torrent_handler(tid)
except UnknownTorrentException: # torrent isn't added yet
defer.returnValue({'status': 'error', 'message': 'torrent_not_found'})
status = tor.get_status(['piece_length', 'files', 'file_priorities', 'file_progress', 'state', 'save_path'])
pieces = tor.status.pieces
piece_length = status['piece_length']
files = status['files']
for f, priority, progress in zip(files, status['file_priorities'], status['file_progress']):
f['first_piece'] = f['offset'] / piece_length
f['last_piece'] = (f['offset'] + f['size']) / piece_length
f['pieces'] = pieces[f['first_piece']:f['last_piece']+1]
f['priority'] = priority
f['progress'] = progress
f = [f for f in files if f['path'] == filepath]
if not f: # file not found in torrent
try:
tf = yield torrent_handler.get_file(filepath)
except UnknownFileException:
defer.returnValue({'status': 'error', 'message': 'file_not_found'})
f = f[0]
priorities = [0] * len(tor.get_status(['files'])['files'])
priorities[f['index']] = 1
tor.set_file_priorities(priorities)
tor.resume()
EXPECTED_PERCENT = 2.0
EXPECTED_SIZE = 5*1024*1024
percent_pieces = int(math.ceil((len(f['pieces']) / 100.0) * EXPECTED_PERCENT))
size_pieces = int(min(math.ceil((EXPECTED_SIZE * 1.0) / piece_length), f['pieces']))
expected_pieces = max(percent_pieces, size_pieces) # we need to download either 5% or 5MB of the file before allowing stream.
fp = os.path.join(status['save_path'], f['path'])
tf = TorrentFile(tor, fp, f['size'], status['piece_length'], f['offset'])
tf.prepare_torrent(expected_pieces)
for _ in range(300):
if os.path.isfile(fp) and tf.is_buffered(expected_pieces):
break
yield sleep(1)
defer.returnValue({
'status': 'success',
'url': 'http://%s:%s/file/%s/%s' % (self.config.config['ip'], self.config.config['port'],
self.fsr.add_file(tf),
urllib.quote(f['path'].split('/')[-1]))
self.fsr.add_file(tf), os.path.basename(tf.file_path))
})