initial move towards 1.0.0

This commit is contained in:
Anders Jensen
2018-08-10 19:59:26 +02:00
parent cc13b032ea
commit 3af5c420f8
9 changed files with 559 additions and 507 deletions

15
.gitignore vendored
View File

@@ -23,8 +23,13 @@ dropin.cache
_trial_temp _trial_temp
*.komodoproject *.komodoproject
docs/_build* docs/_build*
apiserver/metadata/imdbhandler.py .env*
apiserver/metadata/malhandler.py
apiserver/services/search.py
apiserver/services/control.py # for bundling
apiserver/services/files.py thomas
six.py
rarfile.py
rfc6266.py
lepl
pytz

View File

@@ -42,8 +42,20 @@ make Deluge an abstraction layer for the [TidalStream](http://www.tidalstream.or
The _allow remote_ option is to allow remote add and stream of torrents. The _allow remote_ option is to allow remote add and stream of torrents.
## Todo
* [x] Add RAR streaming support
* [ ] Better feedback in interface about streams
* [ ] Better feedback when using API
* [ ] Reverse proxy improvement (e.g. port different than bind port)
# Version Info # Version Info
## Version 0.10.0
* Rewrote large parts of the code
* Now using [thomas](https://github.com/JohnDoee/thomas) as file-reading core - this adds support for multi-rar streaming.
* Faster streaming by reading directly from disk
## Version 0.9.0 ## Version 0.9.0
* Few bugfixes * Few bugfixes
* Added support for Deluge 2 * Added support for Deluge 2

9
create-egg.sh Normal file
View File

@@ -0,0 +1,9 @@
virtualenv .env-egg
.env-egg/bin/pip install -U thomas
ln -s .env-egg/lib/python2.7/site-packages/thomas .
ln -s .env-egg/lib/python2.7/site-packages/rarfile.py .
ln -s .env-egg/lib/python2.7/site-packages/six.py .
ln -s .env-egg/lib/python2.7/site-packages/rfc6266.py .
ln -s .env-egg/lib/python2.7/site-packages/lepl .
ln -s .env-egg/lib/python2.7/site-packages/pytz .
.env-egg/bin/python setup.py bdist_egg

View File

@@ -37,12 +37,12 @@
# statement from all source files in the program, then also delete it here. # statement from all source files in the program, then also delete it here.
# #
from setuptools import setup from setuptools import setup, find_packages
__plugin_name__ = "Streaming" __plugin_name__ = "Streaming"
__author__ = "Anders Jensen" __author__ = "Anders Jensen"
__author_email__ = "johndoee@tidalstream.org" __author_email__ = "johndoee@tidalstream.org"
__version__ = "0.9.0" __version__ = "0.10.0"
__url__ = "https://github.com/JohnDoee/deluge-streaming" __url__ = "https://github.com/JohnDoee/deluge-streaming"
__license__ = "GPLv3" __license__ = "GPLv3"
__description__ = "Enables streaming of files while downloading them." __description__ = "Enables streaming of files while downloading them."
@@ -64,6 +64,18 @@ downloads ahead, this enables seeking in video files.
If you want to stream from a non-local computer, e.g. your seedbox, you will need to change the IP in option to the external server ip.""" If you want to stream from a non-local computer, e.g. your seedbox, you will need to change the IP in option to the external server ip."""
__pkg_data__ = {__plugin_name__.lower(): ["template/*", "data/*"]} __pkg_data__ = {__plugin_name__.lower(): ["template/*", "data/*"]}
REQUIREMENTS_PACKAGES = [
'thomas',
'lepl',
'pytz',
]
REQUIREMENTS_MODULES = [
'six',
'rarfile',
'rfc6266',
]
setup( setup(
name=__plugin_name__, name=__plugin_name__,
version=__version__, version=__version__,
@@ -73,8 +85,10 @@ setup(
url=__url__, url=__url__,
license=__license__, license=__license__,
long_description=__long_description__ if __long_description__ else __description__, long_description=__long_description__ if __long_description__ else __description__,
# install_requires=REQUIREMENTS_PACKAGES,
packages=[__plugin_name__.lower()], packages=[__plugin_name__.lower()] + ['%s.%s' % (x, y) for x in REQUIREMENTS_PACKAGES for y in find_packages(x)] + REQUIREMENTS_PACKAGES,
py_modules=REQUIREMENTS_MODULES,
package_data = __pkg_data__, package_data = __pkg_data__,
entry_points=""" entry_points="""

View File

@@ -37,30 +37,38 @@
# statement from all source files in the program, then also delete it here. # statement from all source files in the program, then also delete it here.
# #
import base64
import json import json
import logging import logging
import os import os
import urllib import random
import string
import time
import deluge.configmanager import deluge.configmanager
from collections import defaultdict
from copy import copy from copy import copy
from datetime import datetime, timedelta
from deluge import component, configmanager from deluge import component, configmanager
from deluge._libtorrent import lt from deluge._libtorrent import lt
from deluge.core.rpcserver import export from deluge.core.rpcserver import export
from deluge.plugins.pluginbase import CorePluginBase from deluge.plugins.pluginbase import CorePluginBase
from twisted.internet import reactor, defer from twisted.internet import reactor, defer, task
from twisted.python import randbytes from twisted.web import server, client
from twisted.web import server, resource, static, client
from thomas import router, Item, OutputBase
from .filelike import FilelikeObjectResource
from .resource import Resource from .resource import Resource
from .torrentfile import DelugeTorrentInput
logger = logging.getLogger(__name__) defer.setDebugging(True)
router.register_handler(DelugeTorrentInput.plugin_name, DelugeTorrentInput, True, False, False)
VIDEO_STREAMABLE_EXTENSIONS = ['mkv', 'mp4', 'iso', 'ogg', 'ogm', 'm4v']
AUDIO_STREAMABLE_EXTENSIONS = ['flac', 'mp3', 'oga']
STREAMABLE_EXTENSIONS = set(VIDEO_STREAMABLE_EXTENSIONS + AUDIO_STREAMABLE_EXTENSIONS)
TORRENT_CLEANUP_INTERVAL = timedelta(minutes=30)
DEFAULT_PREFS = { DEFAULT_PREFS = {
'ip': '127.0.0.1', 'ip': '127.0.0.1',
@@ -70,21 +78,375 @@ DEFAULT_PREFS = {
'use_stream_urls': False, 'use_stream_urls': False,
'auto_open_stream_urls': False, 'auto_open_stream_urls': False,
'use_ssl': False, 'use_ssl': False,
'remote_username': 'username', 'remote_username': 'stream',
'remote_password': 'password', 'remote_password': ''.join(random.choice(string.ascii_letters + string.digits) for _ in range(16)),
'serve_method': 'standalone', 'serve_method': 'standalone',
'ssl_source': 'daemon', 'ssl_source': 'daemon',
'ssl_priv_key_path': '', 'ssl_priv_key_path': '',
'ssl_cert_path': '', 'ssl_cert_path': '',
} }
PRIORITY_INCREASE = 5 logger = logging.getLogger(__name__)
def sleep(seconds): class Torrent(object):
d = defer.Deferred() def __init__(self, torrent_handler, infohash):
reactor.callLater(seconds, d.callback, seconds) self.torrent_handler = torrent_handler
return d self.infohash = infohash
self.filesets = {}
self.readers = {}
self.cycle_lock = defer.DeferredLock()
self.last_activity = datetime.now()
self.torrent = component.get("TorrentManager").torrents.get(infohash, None)
status = self.torrent.get_status(['piece_length'])
self.piece_length = status['piece_length']
self.torrent.handle.set_sequential_download(True)
self.torrent.handle.set_priority(1)
def get_file_from_offset(self, offset):
status = self.torrent.get_status(['files'])
last_file = None
for f in status['files']:
if f['offset'] > offset:
break
last_file = f
return last_file
def can_read(self, from_byte):
needed_piece, rest = divmod(from_byte, self.piece_length)
if rest:
real_needed_piece = needed_piece
else:
real_needed_piece = needed_piece + 1
last_available_piece = None
for piece, status in enumerate(self.torrent.status.pieces[real_needed_piece:], real_needed_piece):
if not status:
break
last_available_piece = piece
if last_available_piece is None:
logger.debug('Since we are waiting for a piece, setting priority for %s to max' % (real_needed_piece, ))
self.torrent.handle.set_piece_deadline(real_needed_piece, 0)
self.torrent.handle.piece_priority(real_needed_piece, 7)
f = self.get_file_from_offset(from_byte)
logger.debug('Also setting file to max %r' % (f, ))
file_priorities = self.torrent.get_file_priorities()
file_priorities[f['index']] = 7
self.torrent.set_file_priorities(file_priorities)
for _ in range(300):
if self.torrent.status.pieces[real_needed_piece]:
break
if not reactor.running:
return
time.sleep(0.2)
logger.debug('Calling read again to get the real number')
return self.can_read(from_byte)
else:
return ((last_available_piece - needed_piece) * self.piece_length) + rest
def is_idle(self):
return not self.readers and self.last_activity + TORRENT_CLEANUP_INTERVAL < datetime.now()
def add_reader(self, filelike, path, from_byte, to_byte):
logger.debug('Added reader %s path:%s from_byte:%s' % (filelike, path, from_byte, ))
self.readers[filelike] = (path, from_byte, to_byte)
self.cycle()
def remove_reader(self, filelike):
if filelike in self.readers:
logger.debug('Removed reader %s' % (filelike, ))
del self.readers[filelike]
self.cycle()
self.last_activity = datetime.now()
def cycle(self):
@defer.inlineCallbacks
def handle_cycle():
yield self.cycle_lock.acquire()
try:
self._cycle()
except:
logger.exception('Failed to cycle')
self.cycle_lock.release()
reactor.callFromThread(handle_cycle)
def _cycle(self):
logger.debug('Doing a cycle')
found_not_started = False
cannot_blacklist = set()
must_whitelist = set()
first_files = set()
for fileset in self.filesets.values():
logger.debug('Fileset %r' % (fileset, ))
if not fileset['started']:
found_not_started = True
must_whitelist |= set(fileset['files'])
fileset['started'] = True
cannot_blacklist |= set(fileset['files'])
first_files.add(fileset['files'][0])
if found_not_started:
logger.debug('We had a fileset not started, must_whitelist:%r first_files:%r cannot_blacklist:%r' % (must_whitelist, first_files, cannot_blacklist))
status = self.torrent.get_status(['files', 'file_progress'])
file_priorities = self.torrent.get_file_priorities()
for f, progress in zip(status['files'], status['file_progress']):
i = f['index']
if progress == 1.0:
file_priorities[i] = 1
continue
if f['path'] in must_whitelist:
if f['path'] in first_files:
file_priorities[i] = 7
else:
file_priorities[i] = 1
elif f['path'] not in cannot_blacklist:
file_priorities[i] = 0
self.torrent.set_file_priorities(file_priorities)
if self.readers:
status = self.torrent.get_status(['files', 'file_progress'])
file_ranges = {}
fileset_ranges = {}
for path, from_byte, to_byte in self.readers.values():
logger.debug('Reader %s, %s, %s' % (path, from_byte, to_byte, ))
if path in file_ranges:
file_ranges[path] = min(from_byte, file_ranges[path])
else:
file_ranges[path] = from_byte
for fileset_hash, fileset in self.filesets.items():
if path in fileset['files']:
if fileset_hash in fileset_ranges:
fileset_ranges[fileset_hash] = min(fileset_ranges[fileset_hash], fileset['files'].index(path))
else:
fileset_ranges[fileset_hash] = fileset['files'].index(path)
currently_downloading = self.get_currently_downloading()
logger.debug('File heads: %r' % (file_ranges, ))
for f, progress in zip(status['files'], status['file_progress']):
if progress == 1.0:
continue
if f['path'] not in file_ranges:
continue
first_piece = f['offset'] // self.piece_length
current_piece = file_ranges[path] // self.piece_length
last_piece = (f['offset'] + f['size']) // self.piece_length
logger.debug('Configuring pieces first piece %s current piece %s - all before should be blacklisted' % (first_piece, current_piece))
for piece, piece_status in enumerate(self.torrent.status.pieces[first_piece:last_piece], first_piece):
if piece_status or piece in currently_downloading:
continue
priority = self.torrent.handle.piece_priority(piece)
if piece == first_piece:
if priority == 0:
self.torrent.handle.piece_priority(piece, 1)
continue
if piece < current_piece:
self.torrent.handle.piece_priority(piece, 0)
elif piece == current_piece:
self.torrent.handle.piece_priority(piece, 7)
else:
self.torrent.handle.piece_priority(piece, 1)
file_priorities = self.torrent.get_file_priorities()
logger.debug('Fileset heads: %r' % (fileset_ranges, ))
for fileset_hash, first_file in fileset_ranges.items():
fileset = self.filesets[fileset_hash]
logger.debug('From index %s' % (first_file, ))
file_mapping = {f['path']: f['index'] for f in status['files']}
for i, f in enumerate(fileset['files']):
index = file_mapping[f]
if i < first_file:
file_priorities[index] = 0
elif i == first_file:
file_priorities[index] = 7
else:
file_priorities[index] = 1
self.torrent.set_file_priorities(file_priorities)
def get_currently_downloading(self):
currently_downloading = set()
for peer in self.torrent.handle.get_peer_info():
if peer.downloading_piece_index != -1:
currently_downloading.add(peer.downloading_piece_index)
return currently_downloading
def reset_priorities(self):
for piece in range(len(self.torrent.status.pieces)):
self.torrent.handle.piece_priority(piece, 1)
self.torrent.set_file_priorities([1] * len(self.torrent.get_file_priorities()))
def shutdown(self):
logger.debug('Shutting down torrent %r' % (self, ))
for reader in self.readers.keys():
reactor.callInThread(reader.close)
def add_fileset(self, fileset):
files = [f.path for f in fileset]
fileset_hash = hash(','.join(files))
if fileset_hash not in self.filesets:
self.filesets[fileset_hash] = {'started': False, 'files': files}
class TorrentHandler(object):
def __init__(self, reset_priorities_on_finish):
self.torrents = {}
self.reset_priorities_on_finish = reset_priorities_on_finish
self.alerts = component.get("AlertManager")
self.alerts.register_handler("torrent_removed_alert", self.on_alert_torrent_removed)
self.alerts.register_handler("torrent_finished_alert", self.on_alert_torrent_finished)
self.cleanup_looping_call = task.LoopingCall(self.cleanup)
self.cleanup_looping_call.start(60)
def on_alert_torrent_removed(self, alert):
try:
infohash = str(alert.handle.info_hash())
except (RuntimeError, KeyError):
logger.warning('Failed to handle on torrent remove alert')
return
if infohash not in self.torrents:
return
self.torrents[infohash].shutdown()
del self.torrents[infohash]
def on_alert_torrent_finished(self, alert):
try:
infohash = str(alert.handle.info_hash())
except (RuntimeError, KeyError):
logger.warning('Failed to handle on torrent finished alert')
return
if infohash not in self.torrents:
return
if self.reset_priorities_on_finish:
self.torrents[infohash].reset_priorities()
def shutdown(self):
for torrent in self.torrents.values():
if self.reset_priorities_on_finish:
torrent.reset_priorities()
torrent.shutdown()
self.cleanup_looping_call.stop()
def get_filesystem(self, infohash):
torrent = component.get("TorrentManager").torrents.get(infohash, None)
status = torrent.get_status(['piece_length', 'files', 'file_progress', 'save_path'])
self.piece_length = status['piece_length']
save_path = status['save_path']
found_rar = False
path_item_mapping = {}
for f, progress in zip(status['files'], status['file_progress']):
full_path = os.path.join(save_path, f['path'])
if '/' in f['path']:
path, fn = f['path'].rsplit('/', 1)
else:
fn = f['path']
path = ''
item = Item(fn, attributes={'size': f['size']})
item.readable = True
item.streamable = True
path_item_mapping.setdefault(path, []).append(item)
if progress == 1.0:
item.add_route('file', True, False, False, kwargs={'path': full_path})
else:
item.add_route('torrent_file', True, False, False, kwargs={
'torrent_handler': self,
'infohash': infohash,
'offset': f['offset'],
'path': full_path,
})
item.add_route('direct', False, False, True)
if not found_rar and fn.split('.')[-1].lower() == 'rar':
found_rar = True
path_mapping = {}
for path, items in path_item_mapping.items():
combined_path = []
for path_part in (path + '/').split('/'):
partial_path = '/'.join(combined_path)
if partial_path not in path_mapping:
item = path_mapping[partial_path] = Item(partial_path.split('/')[-1])
item.streamable = True
item.add_route('direct', False, False, True, kwargs={'allowed_extensions': STREAMABLE_EXTENSIONS})
if found_rar:
item.add_route('rar', False, False, True, kwargs={'lazy': True})
if combined_path:
parent_path = '/'.join(combined_path[:-1])
path_mapping[parent_path].add_item(item)
combined_path.append(path_part)
for item in items:
path_mapping[path].add_item(item)
item = path_mapping[''].list()[0] # TODO: make not use an empty item
item.parent_item = None
return item
def get_torrent(self, infohash):
return self.torrents[infohash]
def stream(self, infohash, path):
logger.debug('Trying to get path:%s from infohash:%s' % (path, infohash))
if infohash not in self.torrents:
self.torrents[infohash] = Torrent(self, infohash)
filesystem = self.get_filesystem(infohash)
if path:
stream_item = filesystem.get_item_from_path(path)
else:
stream_item = filesystem
logger.debug('Stream, path:%s infohash:%s stream_item:%r' % (path, infohash, stream_item))
if stream_item is None:
return None
stream_result = stream_item.stream()
logger.debug('Streamresult, path:%s infohash:%s stream_result:%r' % (path, infohash, stream_result))
if stream_result is None:
return None
if hasattr(stream_result, 'get_read_items'):
self.torrents[infohash].add_fileset(stream_result.get_read_items())
else:
self.torrents[infohash].add_fileset([stream_result])
return stream_result
def cleanup(self):
for infohash, torrent in self.torrents.items():
if torrent.is_idle():
logger.debug('Torrent %s is idle, killing it' % (torrent, ))
torrent.shutdown()
del self.torrents[infohash]
class ServerContextFactory(object): class ServerContextFactory(object):
@@ -106,35 +468,6 @@ class ServerContextFactory(object):
return ctx return ctx
class FileServeResource(resource.Resource):
isLeaf = True
def __init__(self):
self.file_mapping = {}
resource.Resource.__init__(self)
def generate_secure_token(self):
return base64.urlsafe_b64encode(randbytes.RandomFactory().secureRandom(21, True))
def add_file(self, path):
token = self.generate_secure_token()
self.file_mapping[token] = path
return token
def render_GET(self, request):
key = request.postpath[0]
if key not in self.file_mapping:
return resource.NoResource().render(request)
f = self.file_mapping[key]
if f.is_complete():
return static.File(f.full_path).render_GET(request)
else:
tfr = f.open()
return FilelikeObjectResource(tfr, f.size).render_GET(request)
class StreamResource(Resource): class StreamResource(Resource):
isLeaf = True isLeaf = True
@@ -185,413 +518,6 @@ class StreamResource(Resource):
defer.returnValue(json.dumps(result)) defer.returnValue(json.dumps(result))
class UnknownTorrentException(Exception):
pass
class UnknownFileException(Exception):
pass
class TorrentFileReader(object):
def __init__(self, torrent_file):
self.torrent_file = torrent_file
self.size = torrent_file.size
self.position = 0
self.waiting_for_piece = None
self.current_piece = None
self.current_piece_data = None
@defer.inlineCallbacks
def read(self, size=1024):
required_piece, read_position = self.torrent_file.get_piece_info(self.position)
if self.current_piece != required_piece:
logger.debug('We are missing piece %i and it is required, requesting' % (required_piece, ))
self.waiting_for_piece = required_piece
self.current_piece_data = yield self.torrent_file.get_piece_data(required_piece)
self.current_piece = required_piece
self.waiting_for_piece = None
logger.debug('We can read from local piece from %s size %s from position %s - size of current payload %s' % (read_position, size, self.position, len(self.current_piece_data)))
data = self.current_piece_data[read_position:read_position+size]
self.position += len(data)
defer.returnValue(data)
def tell(self):
return self.position
def close(self):
self.torrent_file.close(self)
def seek(self, offset, whence=os.SEEK_SET):
self.position = offset
class TorrentFile(object): # can be read from, knows about itself
def __init__(self, torrent, first_piece, last_piece, piece_size, offset, path, full_path, size, index):
self.torrent = torrent
self.first_piece = first_piece
self.last_piece = last_piece
self.piece_size = piece_size
self.offset = offset
self.path = path
self.size = size
self.full_path = full_path
self.index = index
self.file_requested = False
self.file_requested_once = False
self.do_shutdown = False
self.first_piece_end = self.piece_size * (self.first_piece + 1) - offset
self.waiting_pieces = {}
self.current_readers = []
self.registered_alert = False
self.alerts = component.get("AlertManager")
def open(self):
"""
Returns a filelike object
"""
if not self.registered_alert:
self.alerts.register_handler("read_piece_alert", self.on_alert_got_piece_data)
self.registered_alert = True
tfr = TorrentFileReader(self)
self.current_readers.append(tfr)
self.file_requested = False
return tfr
def close(self, tfr):
self.current_readers.remove(tfr)
def is_complete(self):
torrent_status = self.torrent.torrent.get_status(['file_progress', 'state'])
file_progress = torrent_status['file_progress']
return file_progress and file_progress[self.index] == 1.0
def get_piece_info(self, tell):
return divmod((self.offset + tell), self.piece_size)
def on_alert_got_piece_data(self, alert):
torrent_id = str(alert.handle.info_hash())
if torrent_id != self.torrent.infohash:
return
logger.debug('Got piece data for piece %s' % alert.piece)
if alert.piece not in self.waiting_pieces:
logger.debug('Got data for piece %i, but no data needed for this piece?' % alert.piece)
return
if alert.buffer is None:
return
piece_data = copy(alert.buffer)
cbs = self.waiting_pieces.pop(alert.piece, [])
for cb in cbs:
cb.callback(piece_data)
@defer.inlineCallbacks
def wait_for_end_pieces(self):
handle = self.torrent.torrent.handle
for piece in [self.first_piece, self.last_piece]:
handle.set_piece_deadline(piece, 0)
handle.piece_priority(piece, 7)
while not handle.have_piece(self.first_piece) and not handle.have_piece(self.last_piece):
if self.do_shutdown:
raise Exception('Shutting down')
logger.debug('Did not have piece %i, waiting' % piece)
yield sleep(1)
@defer.inlineCallbacks
def get_piece_data(self, piece):
logger.debug('Trying to get piece data for piece %s' % piece)
for reader in self.current_readers:
if reader.current_piece == piece:
defer.returnValue(reader.current_piece_data)
if piece not in self.waiting_pieces:
created_waiting_defer = True
self.waiting_pieces[piece] = []
else:
created_waiting_defer = False
d = defer.Deferred()
self.waiting_pieces[piece].append(d)
logger.debug('Waiting for %s' % piece)
while not self.torrent.torrent.handle.have_piece(piece):
if self.do_shutdown:
raise Exception('Shutting down')
logger.debug('Did not have piece %i, waiting' % piece)
yield sleep(1)
if created_waiting_defer:
self.torrent.torrent.handle.read_piece(piece)
data = yield d
logger.debug('Done waiting for piece %i, returning data' % piece)
defer.returnValue(data)
def shutdown(self):
self.do_shutdown = True
class Torrent(object):
def __init__(self, torrent_handler, infohash):
self.infohash = infohash
self.torrent = component.get("TorrentManager").torrents.get(infohash, None)
self.torrent_handler = torrent_handler
if not self.torrent:
raise UnknownTorrentException('%s is not a known infohash' % infohash)
self.torrent_files = None
self.priority_increased = defaultdict(set)
self.do_shutdown = False
self.torrent_released = True # set to True if all the files are set to download
self.populate_files()
self.file_priorities = [0] * len(self.torrent_files)
self.last_piece = self.torrent_files[-1].last_piece
self.torrent.handle.set_sequential_download(True)
self.torrent.handle.set_priority(1)
reactor.callLater(0, self.update_piece_priority)
def populate_files(self):
self.torrent_files = []
status = self.torrent.get_status(['piece_length', 'files', 'save_path'])
piece_length = status['piece_length']
files = status['files']
save_path = status['save_path']
for f in files:
first_piece = f['offset'] / piece_length
last_piece = (f['offset'] + f['size']) / piece_length
full_path = os.path.join(save_path, f['path'])
self.torrent_files.append(TorrentFile(self, first_piece, last_piece, piece_length, f['offset'],
f['path'], full_path, f['size'], f['index']))
return files
def find_file(self, file_or_index=None, includes_name=False):
best_file = None
biggest_file_size = 0
for i, f in enumerate(self.torrent_files):
path = f.path
if not includes_name and '/' in path:
path = '/'.join(path.split('/')[1:])
logger.debug('Testing file %r against %s / %r' % (file_or_index, i, path))
if file_or_index is not None:
if i == file_or_index or path == file_or_index:
best_file = f
break
else:
if f.size > biggest_file_size:
best_file = f
biggest_file_size = f.size
return best_file
def get_file(self, file_or_index=None, includes_name=False):
f = self.find_file(file_or_index, includes_name)
if f is None:
raise UnknownFileException('Was unable to find %s' % file_or_index)
return f
def get_currently_downloading(self):
currently_downloading = set()
for peer in self.torrent.handle.get_peer_info():
if peer.downloading_piece_index != -1:
currently_downloading.add(peer.downloading_piece_index)
return currently_downloading
def get_torrent_file(self, file_or_index, includes_name):
f = self.get_file(file_or_index, includes_name)
f.file_requested = True
f.file_requested_once = True
self.torrent.resume()
should_update_priorities = False
if self.file_priorities[f.index] == 0:
self.file_priorities[f.index] = 3
should_update_priorities = True
if self.torrent_released:
should_update_priorities = True
if should_update_priorities and not f.is_complete(): # Need to do this stuff on seek too
self.torrent.set_file_priorities(self.file_priorities)
return f
def shutdown(self):
logger.info('Shutting down torrent %s' % (self.infohash, ))
self.torrent.handle.set_priority(0)
for piece, status in enumerate(self.torrent.status.pieces[0:self.last_piece+1]):
if status:
continue
priority = self.torrent.handle.piece_priority(piece)
if priority == 0:
self.torrent.handle.piece_priority(piece, 1)
if not self.torrent_handler.config['download_only_streamed']:
logger.debug('Resetting file priorities')
file_priorities = [(1 if fp == 0 else fp) for fp in self.file_priorities]
self.torrent.set_file_priorities(file_priorities)
self.do_shutdown = True
self.torrent_handler.remove_torrent(self.infohash)
for tf in self.torrent_files:
tf.shutdown()
def update_piece_priority(self): # if file streamed has reached end, unblacklist all prior pieces
if self.do_shutdown:
return
logger.debug('Updating piece priority for %s' % (self.infohash, ))
currently_downloading = self.get_currently_downloading()
for f in self.torrent_files:
if not f.file_requested and not f.current_readers: # nobody wants the file and nobody is watching
continue
logger.debug('Rescheduling file %s' % (f.path, ))
heads = set()
if f.file_requested: # we expect a piece head to be at start
heads.add(f.first_piece)
waiting_for_pieces = set()
for tfr in f.current_readers:
if tfr.waiting_for_piece is not None:
waiting_for_pieces.add(tfr.waiting_for_piece)
piece = max(tfr.waiting_for_piece, tfr.current_piece)
if piece is not None:
heads.add(piece)
if not heads:
continue
first_head = min(heads)
for head_piece in heads:
priority_increased = 0
for piece, status in enumerate(self.torrent.status.pieces[head_piece:f.last_piece+1], head_piece):
if status or piece in currently_downloading:
continue
priority = self.torrent.handle.piece_priority(piece)
if priority_increased < PRIORITY_INCREASE:
priority_increased += 1
if piece in waiting_for_pieces:
if priority < 7:
logger.debug('setting priority for %s to 7 with deadline 0' % (piece, ))
self.torrent.handle.set_piece_deadline(piece, 0)
self.torrent.handle.piece_priority(piece, 7)
elif priority < 6:
deadline = 3000 * priority_increased
logger.debug('setting priority for %s to 6 with deadline %s' % (piece, deadline, ))
self.torrent.handle.piece_priority(piece, 6)
self.torrent.handle.set_piece_deadline(piece, deadline)
elif priority == 0:
self.torrent.handle.piece_priority(piece, 1)
if head_piece == first_head:
if priority_increased < PRIORITY_INCREASE:
logger.debug('Everything we need has been scheduled, looking for pieces across file to unblacklist')
for piece, status in enumerate(self.torrent.status.pieces[f.first_piece:f.last_piece+1], f.first_piece):
if status:
continue
priority = self.torrent.handle.piece_priority(piece)
if priority == 0:
self.torrent.handle.piece_priority(piece, 1)
else:
logger.debug('Looking for pieces before smallest head %s to blacklist' % (first_head, ))
for piece, status in enumerate(self.torrent.status.pieces[f.first_piece:first_head], f.first_piece):
if status or piece in currently_downloading:
continue
if self.torrent.handle.piece_priority(piece) != 0:
logger.debug('Blacklisting %i' % (piece, ))
self.torrent.handle.piece_priority(piece, 0)
found_requested = False
for f in self.torrent_files:
if f.file_requested_once:
found_requested = True
if not f.is_complete() or f.current_readers:
break
else:
if found_requested:
logger.debug('Nobody is currently using %s, shutting down torrent-handler' % (self.infohash, ))
self.shutdown()
reactor.callLater(1, self.update_piece_priority)
class TorrentHandler(object):
def __init__(self, config):
self.torrents = {}
self.config = config
self.alerts = component.get("AlertManager")
self.alerts.register_handler("torrent_removed_alert", self.on_alert_torrent_removed)
def get_stream(self, infohash, file_or_index=None, includes_name=False):
logger.info('Trying to stream infohash %s and file %s include_name %s' % (infohash, file_or_index, includes_name))
if infohash not in self.torrents:
self.torrents[infohash] = Torrent(self, infohash)
return self.torrents[infohash].get_torrent_file(file_or_index, includes_name)
def on_alert_torrent_removed(self, alert):
try:
torrent_id = str(alert.handle.info_hash())
except (RuntimeError, KeyError):
logger.warning('Failed to handle on torrent remove alert')
return
if torrent_id not in self.torrents:
return
self.torrents[torrent_id].shutdown()
self.remove_torrent(torrent_id)
def remove_torrent(self, torrent_id):
del self.torrents[torrent_id]
def shutdown(self):
logger.debug('Shutting down TorrentHandler')
self.alerts.deregister_handler(self.on_alert_torrent_removed)
for torrent in self.torrents.values():
torrent.shutdown()
class Core(CorePluginBase): class Core(CorePluginBase):
listening = None listening = None
base_url = None base_url = None
@@ -607,9 +533,14 @@ class Core(CorePluginBase):
except AttributeError: except AttributeError:
logger.warning('Unable to exclude partial pieces') logger.warning('Unable to exclude partial pieces')
self.fsr = FileServeResource() http_output_cls = OutputBase.find_plugin('http')
http_output = http_output_cls(url_prefix='file')
http_output.start()
self.thomas_http_output = http_output
resource = Resource() resource = Resource()
resource.putChild('file', self.fsr) resource.putChild('file', http_output.resource)
if self.config['allow_remote']: if self.config['allow_remote']:
resource.putChild('stream', StreamResource(username=self.config['remote_username'], resource.putChild('stream', StreamResource(username=self.config['remote_username'],
password=self.config['remote_password'], password=self.config['remote_password'],
@@ -619,7 +550,7 @@ class Core(CorePluginBase):
base_resource.putChild('streaming', resource) base_resource.putChild('streaming', resource)
self.site = server.Site(base_resource) self.site = server.Site(base_resource)
self.torrent_handler = TorrentHandler(self.config) self.torrent_handler = TorrentHandler(self.config['download_only_streamed'] == False)
plugin_manager = component.get("CorePluginManager") plugin_manager = component.get("CorePluginManager")
logger.warning('plugins %s' % (plugin_manager.get_enabled_plugins(), )) logger.warning('plugins %s' % (plugin_manager.get_enabled_plugins(), ))
@@ -673,6 +604,7 @@ class Core(CorePluginBase):
def disable(self): def disable(self):
self.site.stopFactory() self.site.stopFactory()
self.torrent_handler.shutdown() self.torrent_handler.shutdown()
self.thomas_http_output.stop()
if self.check_webui(): if self.check_webui():
plugin_manager = component.get("CorePluginManager") plugin_manager = component.get("CorePluginManager")
@@ -733,9 +665,10 @@ class Core(CorePluginBase):
@export @export
@defer.inlineCallbacks @defer.inlineCallbacks
def stream_torrent(self, infohash=None, url=None, filedump=None, filepath_or_index=None, includes_name=False, wait_for_end_pieces=False): def stream_torrent(self, infohash=None, url=None, filedump=None, filepath_or_index=None, includes_name=False, wait_for_end_pieces=False):
tor = component.get("TorrentManager").torrents.get(infohash, None) logger.debug('Trying to stream infohash:%s, url:%s, filepath_or_index:%s' % (infohash, url, filepath_or_index))
torrent = component.get("TorrentManager").torrents.get(infohash, None)
if tor is None: if torrent is None:
logger.info('Did not find torrent, must add it') logger.info('Did not find torrent, must add it')
if not filedump and url: if not filedump and url:
@@ -751,23 +684,28 @@ class Core(CorePluginBase):
try: try:
yield core.add_torrent_file('file.torrent', filedump.encode('base64'), {'add_paused': True}) yield core.add_torrent_file('file.torrent', filedump.encode('base64'), {'add_paused': True})
except: except:
logger.exception('Failed to add torrent')
defer.returnValue({'status': 'error', 'message': 'failed to add torrent'}) defer.returnValue({'status': 'error', 'message': 'failed to add torrent'})
if filepath_or_index is None:
fn = ''
elif isinstance(filepath_or_index, int):
status = torrent.get_status(['files'])
fn = status['files'][filepath_or_index]['path']
else:
fn = filepath_or_index
try: try:
tf = self.torrent_handler.get_stream(infohash, filepath_or_index, includes_name) stream_or_item = self.torrent_handler.stream(infohash, fn)
except UnknownTorrentException: stream_url = self.thomas_http_output.serve_item(stream_or_item)
defer.returnValue({'status': 'error', 'message': 'unable to find torrent, probably failed to add it'}) except:
logger.exception('Failed to stream torrent')
defer.returnValue({'status': 'error', 'message': 'failed to stream torrent'})
if wait_for_end_pieces:
logger.debug('Waiting for end pieces')
yield tf.wait_for_end_pieces()
filename = os.path.basename(tf.path).encode('utf-8')
defer.returnValue({ defer.returnValue({
'status': 'success', 'status': 'success',
'filename': filename, 'filename': stream_url.split('/')[-1],
'use_stream_urls': self.config['use_stream_urls'], 'use_stream_urls': self.config['use_stream_urls'],
'auto_open_stream_urls': self.config['auto_open_stream_urls'], 'auto_open_stream_urls': self.config['auto_open_stream_urls'],
'url': '%s/streaming/file/%s/%s' % (self.base_url, self.fsr.add_file(tf), 'url': '%s/streaming/%s' % (self.base_url, stream_url.lstrip('/'))
urllib.quote_plus(filename))
}) })

View File

@@ -26,19 +26,26 @@
<property name="can_focus">False</property> <property name="can_focus">False</property>
<property name="spacing">5</property> <property name="spacing">5</property>
<child> <child>
<widget class="GtkCheckButton" id="input_download_only_streamed"> <widget class="GtkVBox" id="settings_vbox">
<property name="label" translatable="yes">Download only streamed files, skip the other files</property>
<property name="visible">True</property> <property name="visible">True</property>
<property name="can_focus">True</property> <property name="can_focus">False</property>
<property name="receives_default">False</property> <property name="spacing">5</property>
<property name="use_action_appearance">False</property> <child>
<property name="draw_indicator">True</property> <widget class="GtkCheckButton" id="input_download_only_streamed">
<property name="label" translatable="yes">Download only streamed files, skip the other files</property>
<property name="visible">True</property>
<property name="can_focus">True</property>
<property name="receives_default">False</property>
<property name="use_action_appearance">False</property>
<property name="draw_indicator">True</property>
</widget>
<packing>
<property name="expand">False</property>
<property name="fill">False</property>
<property name="position">0</property>
</packing>
</child>
</widget> </widget>
<packing>
<property name="expand">False</property>
<property name="fill">False</property>
<property name="position">0</property>
</packing>
</child> </child>
</widget> </widget>
</child> </child>
@@ -159,10 +166,10 @@
<property name="position">1</property> <property name="position">1</property>
</packing> </packing>
</child> </child>
<child> <!-- <child>
<widget class="GtkRadioButton" id="input_serve_webui"> <widget class="GtkRadioButton" id="input_serve_webui">
<property name="label" translatable="yes">Serve files via WebUI</property> <property name="label" translatable="yes">Serve files via WebUI</property>
<property name="visible">True</property> <property name="visible">False</property>
<property name="sensitive">False</property> <property name="sensitive">False</property>
<property name="can_focus">True</property> <property name="can_focus">True</property>
<property name="receives_default">False</property> <property name="receives_default">False</property>
@@ -175,16 +182,16 @@
<property name="fill">True</property> <property name="fill">True</property>
<property name="position">2</property> <property name="position">2</property>
</packing> </packing>
</child> </child> -->
<child> <child>
<widget class="GtkVBox" id="settings_vbox3"> <widget class="GtkVBox" id="settings_vbox3">
<property name="visible">True</property> <property name="visible">True</property>
<property name="can_focus">False</property> <property name="can_focus">False</property>
<property name="spacing">5</property> <property name="spacing">5</property>
<child> <!-- <child>
<widget class="GtkRadioButton" id="input_serve_standalone"> <widget class="GtkRadioButton" id="input_serve_standalone">
<property name="label" translatable="yes">Serve files via standalone</property> <property name="label" translatable="yes">Serve files via standalone</property>
<property name="visible">True</property> <property name="visible">False</property>
<property name="sensitive">False</property> <property name="sensitive">False</property>
<property name="can_focus">True</property> <property name="can_focus">True</property>
<property name="receives_default">False</property> <property name="receives_default">False</property>
@@ -198,12 +205,12 @@
<property name="fill">True</property> <property name="fill">True</property>
<property name="position">0</property> <property name="position">0</property>
</packing> </packing>
</child> </child> -->
<child> <child>
<widget class="GtkAlignment" id="remote_alignment1"> <widget class="GtkAlignment" id="remote_alignment1">
<property name="visible">True</property> <property name="visible">True</property>
<property name="can_focus">False</property> <property name="can_focus">False</property>
<property name="left_padding">20</property> <!-- <property name="left_padding">20</property> -->
<child> <child>
<widget class="GtkVBox" id="remote_vbox1"> <widget class="GtkVBox" id="remote_vbox1">
<property name="visible">True</property> <property name="visible">True</property>
@@ -436,7 +443,7 @@
<property name="visible">True</property> <property name="visible">True</property>
<property name="can_focus">False</property> <property name="can_focus">False</property>
<property name="spacing">5</property> <property name="spacing">5</property>
<child> <!-- <child>
<widget class="GtkHBox" id="remote_username_hbox1"> <widget class="GtkHBox" id="remote_username_hbox1">
<property name="visible">True</property> <property name="visible">True</property>
<property name="can_focus">False</property> <property name="can_focus">False</property>
@@ -476,7 +483,7 @@
<property name="fill">False</property> <property name="fill">False</property>
<property name="position">0</property> <property name="position">0</property>
</packing> </packing>
</child> </child> -->
<child> <child>
<widget class="GtkHBox" id="remote_password_hbox1"> <widget class="GtkHBox" id="remote_password_hbox1">
<property name="visible">True</property> <property name="visible">True</property>
@@ -498,7 +505,7 @@
<widget class="GtkEntry" id="input_remote_password"> <widget class="GtkEntry" id="input_remote_password">
<property name="visible">True</property> <property name="visible">True</property>
<property name="can_focus">True</property> <property name="can_focus">True</property>
<property name="visibility">False</property> <property name="visibility">True</property>
<property name="invisible_char">•</property> <property name="invisible_char">•</property>
<property name="invisible_char_set">True</property> <property name="invisible_char_set">True</property>
<property name="primary_icon_activatable">False</property> <property name="primary_icon_activatable">False</property>

View File

@@ -124,10 +124,11 @@ class GtkUI(GtkPluginBase):
def on_apply_prefs(self): def on_apply_prefs(self):
log.debug("applying prefs for Streaming") log.debug("applying prefs for Streaming")
if self.glade.get_widget("input_serve_standalone").get_active(): serve_method = 'standalone'
serve_method = 'standalone' # if self.glade.get_widget("input_serve_standalone").get_active():
elif self.glade.get_widget("input_serve_webui").get_active(): # serve_method = 'standalone'
serve_method = 'webui' # elif self.glade.get_widget("input_serve_webui").get_active():
# serve_method = 'webui'
if self.glade.get_widget("input_ssl_cert_daemon").get_active(): if self.glade.get_widget("input_ssl_cert_daemon").get_active():
ssl_source = 'daemon' ssl_source = 'daemon'
@@ -141,8 +142,9 @@ class GtkUI(GtkPluginBase):
"auto_open_stream_urls": self.glade.get_widget("input_auto_open_stream_urls").get_active(), "auto_open_stream_urls": self.glade.get_widget("input_auto_open_stream_urls").get_active(),
"allow_remote": self.glade.get_widget("input_allow_remote").get_active(), "allow_remote": self.glade.get_widget("input_allow_remote").get_active(),
"download_only_streamed": self.glade.get_widget("input_download_only_streamed").get_active(), "download_only_streamed": self.glade.get_widget("input_download_only_streamed").get_active(),
# "download_in_order": self.glade.get_widget("input_download_in_order").get_active(),
"use_ssl": self.glade.get_widget("input_use_ssl").get_active(), "use_ssl": self.glade.get_widget("input_use_ssl").get_active(),
"remote_username": self.glade.get_widget("input_remote_username").get_text(), # "remote_username": self.glade.get_widget("input_remote_username").get_text(),
"remote_password": self.glade.get_widget("input_remote_password").get_text(), "remote_password": self.glade.get_widget("input_remote_password").get_text(),
"ssl_priv_key_path": self.glade.get_widget("input_ssl_priv_key_path").get_text(), "ssl_priv_key_path": self.glade.get_widget("input_ssl_priv_key_path").get_text(),
"ssl_cert_path": self.glade.get_widget("input_ssl_cert_path").get_text(), "ssl_cert_path": self.glade.get_widget("input_ssl_cert_path").get_text(),
@@ -173,18 +175,20 @@ class GtkUI(GtkPluginBase):
self.glade.get_widget("input_allow_remote").set_active(config["allow_remote"]) self.glade.get_widget("input_allow_remote").set_active(config["allow_remote"])
self.glade.get_widget("input_use_ssl").set_active(config["use_ssl"]) self.glade.get_widget("input_use_ssl").set_active(config["use_ssl"])
self.glade.get_widget("input_download_only_streamed").set_active(config["download_only_streamed"]) self.glade.get_widget("input_download_only_streamed").set_active(config["download_only_streamed"])
self.glade.get_widget("input_remote_username").set_text(config["remote_username"]) # self.glade.get_widget("input_download_in_order").set_active(config["download_in_order"])
# self.glade.get_widget("input_download_everything").set_active(not config["download_in_order"] and not config["download_only_streamed"])
# self.glade.get_widget("input_remote_username").set_text(config["remote_username"])
self.glade.get_widget("input_remote_password").set_text(config["remote_password"]) self.glade.get_widget("input_remote_password").set_text(config["remote_password"])
self.glade.get_widget("input_ssl_priv_key_path").set_text(config["ssl_priv_key_path"]) self.glade.get_widget("input_ssl_priv_key_path").set_text(config["ssl_priv_key_path"])
self.glade.get_widget("input_ssl_cert_path").set_text(config["ssl_cert_path"]) self.glade.get_widget("input_ssl_cert_path").set_text(config["ssl_cert_path"])
self.glade.get_widget("input_serve_standalone").set_active(config["serve_method"] == "standalone") # self.glade.get_widget("input_serve_standalone").set_active(config["serve_method"] == "standalone")
self.glade.get_widget("input_serve_webui").set_active(config["serve_method"] == "webui") # self.glade.get_widget("input_serve_webui").set_active(config["serve_method"] == "webui")
self.glade.get_widget("input_ssl_cert_daemon").set_active(config["ssl_source"] == "daemon") self.glade.get_widget("input_ssl_cert_daemon").set_active(config["ssl_source"] == "daemon")
self.glade.get_widget("input_ssl_cert_custom").set_active(config["ssl_source"] == "custom") self.glade.get_widget("input_ssl_cert_custom").set_active(config["ssl_source"] == "custom")
api_url = 'http%s://%s:%s/streaming/stream' % (('s' if config["use_ssl"] else ''), config["ip"], config["port"]) api_url = 'http%s://%s:%s@%s:%s/streaming/stream' % (('s' if config["use_ssl"] else ''), config["remote_username"], config["remote_password"], config["ip"], config["port"])
self.glade.get_widget("output_remote_url").set_text(api_url) self.glade.get_widget("output_remote_url").set_text(api_url)
def stream_ready(self, result): def stream_ready(self, result):

View File

@@ -29,8 +29,6 @@ class Resource(TwistedResource):
authenticated = True authenticated = True
if not authenticated: if not authenticated:
print auth_header
print self.username, self.password
request.setResponseCode(401) request.setResponseCode(401)
return 'Unauthorized' return 'Unauthorized'

65
streaming/torrentfile.py Normal file
View File

@@ -0,0 +1,65 @@
import logging
import mimetypes
import os
from thomas import InputBase
logger = logging.getLogger(__name__)
class DelugeTorrentInput(InputBase.find_plugin('file')):
plugin_name = 'torrent_file'
protocols = []
can_read_to = None
def __init__(self, item, torrent_handler, infohash, offset, path):
self.item = item
self.torrent_handler = torrent_handler
self.torrent = torrent_handler.get_torrent(infohash)
self.infohash = infohash
self.offset = offset
self.path = path
self.size, self.filename, self.content_type = self.get_info()
def get_info(self):
logger.info('Getting info about %r' % (self.path, ))
content_type = mimetypes.guess_type(self.path)[0] or 'bytes'
return self.item['size'], os.path.basename(self.path), content_type
def ensure_exists(self):
if not os.path.exists(self.path):
self.torrent.can_read(self.offset)
def seek(self, pos):
self.ensure_exists()
super(DelugeTorrentInput, self).seek(pos)
logger.debug('Seeking at %s torrentfile_id %r' % (self.tell(), id(self)))
self.torrent.add_reader(self, self.item.path, self.offset + self.tell(), self.offset + self.size)
def read(self, num):
self.ensure_exists()
if not self._open_file:
self.seek(0)
#logger.debug('Trying to read %s from %i torrentfile_id %r' % (self.path, self.tell(), id(self)))
tell = self.tell()
if self.can_read_to <= tell or self.can_read_to is None:
self.can_read_to = self.torrent.can_read(self.offset + tell) + tell
real_num = min(num, self.can_read_to - tell)
if num != real_num:
logger.info('The real number we can read to is %s and not %s at position %s' % (real_num, num, tell))
if not self._open_file: # the file was closed while we waited
return b''
data = super(DelugeTorrentInput, self).read(real_num)
return data
def close(self):
self.torrent.remove_reader(self)
super(DelugeTorrentInput, self).close()