mirror of
https://github.com/JohnDoee/deluge-streaming/
synced 2026-07-01 07:31:17 -07:00
fixed bug where multiple readers of same piece caused errors
This commit is contained in:
@@ -207,7 +207,7 @@ class TorrentFileReader(object):
|
||||
self.current_piece = required_piece
|
||||
self.waiting_for_piece = None
|
||||
|
||||
logger.debug('We can read from local piece from %s size %s from position %s' % (read_position, size, self.position))
|
||||
logger.debug('We can read from local piece from %s size %s from position %s - size of current payload %s' % (read_position, size, self.position, len(self.current_piece_data)))
|
||||
data = self.current_piece_data[read_position:read_position+size]
|
||||
self.position += len(data)
|
||||
|
||||
@@ -265,7 +265,7 @@ class TorrentFile(object): # can be read from, knows about itself
|
||||
def is_complete(self):
|
||||
torrent_status = self.torrent.torrent.get_status(['file_progress', 'state'])
|
||||
file_progress = torrent_status['file_progress']
|
||||
return file_progress[self.index] == 1.0
|
||||
return file_progress and file_progress[self.index] == 1.0
|
||||
|
||||
def get_piece_info(self, tell):
|
||||
return divmod((self.offset + tell), self.piece_size)
|
||||
@@ -284,7 +284,10 @@ class TorrentFile(object): # can be read from, knows about itself
|
||||
return
|
||||
|
||||
piece_data = copy(alert.buffer)
|
||||
self.waiting_pieces[alert.piece].callback(piece_data)
|
||||
cbs = self.waiting_pieces.pop(alert.piece, [])
|
||||
|
||||
for cb in cbs:
|
||||
cb.callback(piece_data)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def wait_for_end_pieces(self):
|
||||
@@ -307,7 +310,13 @@ class TorrentFile(object): # can be read from, knows about itself
|
||||
defer.returnValue(reader.current_piece_data)
|
||||
|
||||
if piece not in self.waiting_pieces:
|
||||
self.waiting_pieces[piece] = defer.Deferred()
|
||||
created_waiting_defer = True
|
||||
self.waiting_pieces[piece] = []
|
||||
else:
|
||||
created_waiting_defer = False
|
||||
|
||||
d = defer.Deferred()
|
||||
self.waiting_pieces[piece].append(d)
|
||||
|
||||
logger.debug('Waiting for %s' % piece)
|
||||
while not self.torrent.torrent.handle.have_piece(piece):
|
||||
@@ -316,11 +325,10 @@ class TorrentFile(object): # can be read from, knows about itself
|
||||
logger.debug('Did not have piece %i, waiting' % piece)
|
||||
yield sleep(1)
|
||||
|
||||
self.torrent.torrent.handle.read_piece(piece)
|
||||
if created_waiting_defer:
|
||||
self.torrent.torrent.handle.read_piece(piece)
|
||||
|
||||
data = yield self.waiting_pieces[piece]
|
||||
if piece in self.waiting_pieces:
|
||||
del self.waiting_pieces[piece]
|
||||
data = yield d
|
||||
logger.debug('Done waiting for piece %i, returning data' % piece)
|
||||
defer.returnValue(data)
|
||||
|
||||
|
||||
@@ -2,22 +2,18 @@ from twisted.internet import defer
|
||||
from twisted.python import log
|
||||
from twisted.web import http, resource, server, static
|
||||
|
||||
# NOTICE!
|
||||
# All these producers are taken directly from the Twisted Project.
|
||||
# This is because i needed to make them accept defers.
|
||||
# /NOTICE!
|
||||
|
||||
class NoRangeStaticProducer(static.NoRangeStaticProducer):
|
||||
@defer.inlineCallbacks
|
||||
def resumeProducing(self):
|
||||
if not self.request:
|
||||
return
|
||||
|
||||
|
||||
data = yield defer.maybeDeferred(self.fileObject.read, self.bufferSize)
|
||||
|
||||
|
||||
if not self.request:
|
||||
return
|
||||
|
||||
|
||||
if data:
|
||||
# this .write will spin the reactor, calling .doWrite and then
|
||||
# .resumeProducing again, so be prepared for a re-entrant call
|
||||
@@ -27,35 +23,37 @@ class NoRangeStaticProducer(static.NoRangeStaticProducer):
|
||||
self.request.finish()
|
||||
self.stopProducing()
|
||||
|
||||
|
||||
class SingleRangeStaticProducer(static.SingleRangeStaticProducer):
|
||||
@defer.inlineCallbacks
|
||||
def resumeProducing(self):
|
||||
if not self.request:
|
||||
return
|
||||
|
||||
|
||||
data = yield defer.maybeDeferred(self.fileObject.read,
|
||||
min(self.bufferSize, self.size - self.bytesWritten))
|
||||
|
||||
|
||||
if not self.request:
|
||||
return
|
||||
|
||||
|
||||
if data:
|
||||
self.bytesWritten += len(data)
|
||||
# this .write will spin the reactor, calling .doWrite and then
|
||||
# .resumeProducing again, so be prepared for a re-entrant call
|
||||
self.request.write(data)
|
||||
|
||||
|
||||
if self.request and self.bytesWritten == self.size:
|
||||
self.request.unregisterProducer()
|
||||
self.request.finish()
|
||||
self.stopProducing()
|
||||
|
||||
|
||||
class MultipleRangeStaticProducer(static.MultipleRangeStaticProducer):
|
||||
@defer.inlineCallbacks
|
||||
def resumeProducing(self):
|
||||
if not self.request:
|
||||
return
|
||||
|
||||
|
||||
data = []
|
||||
dataLength = 0
|
||||
done = False
|
||||
@@ -76,18 +74,19 @@ class MultipleRangeStaticProducer(static.MultipleRangeStaticProducer):
|
||||
except StopIteration:
|
||||
done = True
|
||||
break
|
||||
|
||||
|
||||
if not self.request:
|
||||
return
|
||||
|
||||
|
||||
self.request.write(''.join(data))
|
||||
|
||||
|
||||
if done:
|
||||
self.request.unregisterProducer()
|
||||
self.request.finish()
|
||||
self.stopProducing()
|
||||
self.request = None
|
||||
|
||||
|
||||
class FilelikeObjectResource(static.File):
|
||||
isLeaf = True
|
||||
contentType = None
|
||||
|
||||
Reference in New Issue
Block a user