mirror of
https://github.com/JohnDoee/deluge-streaming/
synced 2026-07-01 07:31:17 -07:00
added a bit of cleanup
This commit is contained in:
@@ -45,6 +45,7 @@ import os
|
||||
import urllib
|
||||
|
||||
from collections import defaultdict
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
from twisted.internet import reactor, defer, task
|
||||
from twisted.python import randbytes
|
||||
@@ -72,6 +73,7 @@ from .filelike import FilelikeObjectResource
|
||||
MIN_QUEUE_CHUNKS = 6
|
||||
EXPECTED_PERCENT = 0.3
|
||||
EXPECTED_SIZE = 5*1024*1024
|
||||
HANDLERS_TIMEOUT = timedelta(minutes=3)
|
||||
|
||||
class UnknownTorrentException(Exception):
|
||||
pass
|
||||
@@ -148,9 +150,11 @@ class StreamResource(Resource):
|
||||
defer.returnValue(json.dumps(result))
|
||||
|
||||
class TorrentFile(object):
|
||||
def __init__(self, torrent_handler, file_path, size, chunk_size, offset):
|
||||
file_handler = None
|
||||
def __init__(self, torrent_handler, file_path, torrent_file_path, size, chunk_size, offset):
|
||||
self.torrent_handler = torrent_handler
|
||||
self.file_path = file_path
|
||||
self.torrent_file_path = torrent_file_path
|
||||
self.first_chunk = offset / chunk_size
|
||||
self.last_chunk = (offset + size) / chunk_size
|
||||
self.chunk_size = chunk_size
|
||||
@@ -159,10 +163,11 @@ class TorrentFile(object):
|
||||
self.last_requested_chunk = self.first_chunk
|
||||
self.is_closed = False
|
||||
self.is_active = False
|
||||
self.last_activity = datetime.now()
|
||||
|
||||
self.first_chunk_end = self.chunk_size * (self.first_chunk + 1) - offset
|
||||
|
||||
self.torrent_handler.torrent_files[self.file_path].append(self)
|
||||
self.torrent_handler.torrent_files[torrent_file_path].append(self)
|
||||
|
||||
def open(self):
|
||||
if self.is_closed:
|
||||
@@ -183,12 +188,13 @@ class TorrentFile(object):
|
||||
self.torrent_handler.schedule_chunk(self.first_chunk, 0)
|
||||
self.torrent_handler.schedule_chunk(self.last_chunk, 1)
|
||||
|
||||
for chunk, chunk_status in enumerate(self.torrent_handler.torrent.status.pieces[self.first_chunk:self.first_chunk+buffer_pieces+1], self.first_chunk):
|
||||
for chunk, chunk_status in enumerate(self.torrent_handler.torrent.status.pieces[self.first_chunk:min(self.first_chunk+buffer_pieces, self.last_chunk)+1], self.first_chunk):
|
||||
self.torrent_handler.schedule_chunk(chunk, chunk-self.first_chunk)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def read(self, size=1024):
|
||||
self.is_active = True
|
||||
self.last_activity = datetime.now()
|
||||
|
||||
tell = self.tell()
|
||||
chunk, end_of_chunk = self.get_chunk(tell)
|
||||
@@ -226,24 +232,30 @@ class TorrentFile(object):
|
||||
return self.file_handler.tell()
|
||||
|
||||
def close(self):
|
||||
if not self.is_closed and self in self.torrent_handler.torrent_files[self.file_path]:
|
||||
self.torrent_handler.torrent_files[self.file_path].remove(self)
|
||||
if not self.is_closed and self in self.torrent_handler.torrent_files[self.torrent_file_path]:
|
||||
self.torrent_handler.torrent_files[self.torrent_file_path].remove(self)
|
||||
self.is_closed = True
|
||||
return self.file_handler.close()
|
||||
|
||||
if self.file_handler:
|
||||
return self.file_handler.close()
|
||||
|
||||
def copy(self):
|
||||
tf = TorrentFile(self.torrent_handler, self.file_path, self.size, self.chunk_size, self.offset)
|
||||
tf = TorrentFile(self.torrent_handler, self.file_path, self.torrent_file_path, self.size, self.chunk_size, self.offset)
|
||||
return tf
|
||||
|
||||
class TorrentHandler(object):
|
||||
def __init__(self, torrent):
|
||||
def __init__(self, torrent, torrent_id, core):
|
||||
self.torrent = torrent
|
||||
self.torrent_handle = torrent.handle
|
||||
self.torrent_id = torrent_id
|
||||
self.core = core
|
||||
self.priorities = [0] * len(torrent.get_status(['files'])['files']) # TODO: get current priorities and use those instead
|
||||
self.torrent_files = defaultdict(list)
|
||||
self.priorities_increased = {}
|
||||
# need to blackhole all pieces not downloaded yet
|
||||
self.last_activity = datetime.now()
|
||||
self.update_chunk_priorities()
|
||||
|
||||
|
||||
def is_buffered(self, first_chunk, last_chunk):
|
||||
buffer_status = [x for x in self.torrent.status.pieces[first_chunk:last_chunk+1] if not x]
|
||||
@@ -268,7 +280,9 @@ class TorrentHandler(object):
|
||||
def update_chunk_priority(self, tfs):
|
||||
handled_heads = []
|
||||
for tf in sorted(tfs, key=lambda x:x.last_requested_chunk, reverse=True):
|
||||
if tf.is_closed:
|
||||
if datetime.now() - tf.last_activity > HANDLERS_TIMEOUT:
|
||||
logger.debug('Torrentfile timed out')
|
||||
tf.close()
|
||||
continue
|
||||
|
||||
if tf.last_requested_chunk is not None:
|
||||
@@ -310,9 +324,22 @@ class TorrentHandler(object):
|
||||
|
||||
handled_heads.append(tf.last_requested_chunk)
|
||||
|
||||
return bool(handled_heads)
|
||||
|
||||
def update_chunk_priorities(self):
|
||||
for tfs in self.torrent_files.values():
|
||||
self.update_chunk_priority(tfs)
|
||||
for torrent_file_path, tfs in self.torrent_files.items():
|
||||
if not tfs:
|
||||
del self.torrent_files[torrent_file_path]
|
||||
continue
|
||||
|
||||
if self.update_chunk_priority(tfs):
|
||||
self.last_activity = datetime.now()
|
||||
|
||||
if datetime.now() - self.last_activity > HANDLERS_TIMEOUT:
|
||||
logger.debug('Torrent handler idle, killing myself.')
|
||||
if self.torrent_id in self.core.torrent_handlers:
|
||||
del self.core.torrent_handlers[self.torrent_id]
|
||||
return
|
||||
|
||||
reactor.callLater(2, self.update_chunk_priorities)
|
||||
|
||||
@@ -339,26 +366,29 @@ class TorrentHandler(object):
|
||||
else:
|
||||
raise UnknownFileException()
|
||||
|
||||
fp = os.path.join(status['save_path'], f['path'])
|
||||
|
||||
if progress == 1: # file is complete, no need to fire up all the torrent jazz
|
||||
defer.returnValue(static.File(fp))
|
||||
|
||||
self.priorities[f['index']] = 3
|
||||
self.update_priorities()
|
||||
if progress < 1:
|
||||
self.torrent.resume()
|
||||
|
||||
self.torrent.resume()
|
||||
|
||||
percent_pieces = int(math.ceil((len(f['pieces']) / 100.0) * EXPECTED_PERCENT))
|
||||
size_pieces = int(min(math.ceil((EXPECTED_SIZE * 1.0) / piece_length), f['pieces']))
|
||||
expected_pieces = max(percent_pieces, size_pieces) # we need to download either 5% or 5MB of the file before allowing stream.
|
||||
|
||||
fp = os.path.join(status['save_path'], f['path'])
|
||||
tf = TorrentFile(self, fp, f['path'], f['size'], status['piece_length'], f['offset'])
|
||||
|
||||
tf = TorrentFile(self, fp, f['size'], status['piece_length'], f['offset'])
|
||||
|
||||
if len(self.torrent_files[fp]) == 1:
|
||||
if len(self.torrent_files[f['path']]) == 1:
|
||||
self.blackhole_all_pieces(tf.first_chunk, tf.last_chunk)
|
||||
|
||||
tf.prepare_torrent(expected_pieces)
|
||||
|
||||
for _ in range(300):
|
||||
if os.path.isfile(fp) and self.is_buffered(tf.first_chunk, tf.first_chunk+expected_pieces):
|
||||
if os.path.isfile(fp) and self.is_buffered(tf.first_chunk, tf.first_chunk+expected_pieces) and self.is_buffered(tf.last_chunk, tf.last_chunk):
|
||||
break
|
||||
|
||||
yield sleep(1)
|
||||
@@ -436,7 +466,7 @@ class Core(CorePluginBase):
|
||||
if tor is None:
|
||||
raise UnknownTorrentException()
|
||||
|
||||
self.torrent_handlers[tid] = TorrentHandler(tor)
|
||||
self.torrent_handlers[tid] = TorrentHandler(tor, tid, self)
|
||||
|
||||
return self.torrent_handlers[tid]
|
||||
|
||||
|
||||
Reference in New Issue
Block a user