2018-08-10 19:59:26 +02:00
import logging
import mimetypes
import os
2020-04-21 21:49:31 +02:00
import time
import threading
from io import BytesIO
2018-08-10 19:59:26 +02:00
from thomas import InputBase
logger = logging . getLogger ( __name__ )
2020-04-21 21:49:31 +02:00
PIECE_REQUEST_HISTORY_TIME = 10
MAX_PIECE_REQUEST_COUNT = 20
2018-08-10 19:59:26 +02:00
2020-04-21 21:49:31 +02:00
class DelugeTorrentInput ( InputBase ) :
2018-08-10 19:59:26 +02:00
plugin_name = ' torrent_file '
protocols = [ ]
2020-04-21 21:49:31 +02:00
current_piece_data = None
2018-08-10 19:59:26 +02:00
can_read_to = None
2020-04-21 21:49:31 +02:00
last_available_piece = None
_pos = None
_closed = False
2018-08-10 19:59:26 +02:00
def __init__ ( self , item , torrent_handler , infohash , offset , path ) :
self . item = item
self . torrent_handler = torrent_handler
self . torrent = torrent_handler . get_torrent ( infohash )
self . infohash = infohash
self . offset = offset
self . path = path
2020-04-21 21:49:31 +02:00
self . piece_buffer = { }
self . requested_pieces = { }
2020-04-22 12:34:25 +02:00
self . piece_consumption_time = [ ]
2018-08-10 19:59:26 +02:00
self . size , self . filename , self . content_type = self . get_info ( )
def get_info ( self ) :
logger . info ( ' Getting info about %r ' % ( self . path , ) )
content_type = mimetypes . guess_type ( self . path ) [ 0 ] or ' bytes '
return self . item [ ' size ' ] , os . path . basename ( self . path ) , content_type
def ensure_exists ( self ) :
if not os . path . exists ( self . path ) :
self . torrent . can_read ( self . offset )
2020-04-21 21:49:31 +02:00
def tell ( self ) :
return self . _pos
2018-08-10 19:59:26 +02:00
def seek ( self , pos ) :
self . ensure_exists ( )
2020-04-21 21:49:31 +02:00
self . _pos = pos
2018-08-10 19:59:26 +02:00
logger . debug ( ' Seeking at %s torrentfile_id %r ' % ( self . tell ( ) , id ( self ) ) )
self . torrent . add_reader ( self , self . item . path , self . offset + self . tell ( ) , self . offset + self . size )
2020-04-21 21:49:31 +02:00
def _read ( self , num ) :
data = self . current_piece_data . read ( num )
self . _pos + = len ( data )
return data
2018-08-10 19:59:26 +02:00
def read ( self , num ) :
2020-04-21 21:49:31 +02:00
if self . current_piece_data :
data = self . _read ( num )
if data :
return data
2018-08-10 19:59:26 +02:00
self . ensure_exists ( )
2020-04-21 21:49:31 +02:00
if self . _pos is None :
2018-08-10 19:59:26 +02:00
self . seek ( 0 )
2019-11-02 21:55:23 +01:00
logger . debug ( ' Trying to read %s from %i torrentfile_id %r ' % ( self . path , self . tell ( ) , id ( self ) ) )
2018-08-10 19:59:26 +02:00
tell = self . tell ( )
2019-11-02 21:55:23 +01:00
if self . can_read_to is None or self . can_read_to < = tell :
2020-04-21 21:49:31 +02:00
can_read_result = self . torrent . can_read ( self . offset + tell )
self . last_available_piece = can_read_result [ 1 ]
self . can_read_to = can_read_result [ 0 ] + tell
current_piece , rest = self . current_piece
logger . debug ( ' Calculated last available piece is %s offset %s can_read_to %s piece_length %s ' % ( self . last_available_piece , self . offset , self . can_read_to , self . torrent . piece_length ) )
while self . piece_consumption_time and self . piece_consumption_time [ 0 ] < time . time ( ) - PIECE_REQUEST_HISTORY_TIME :
self . piece_consumption_time . pop ( 0 )
max_piece_count = ( self . last_available_piece - current_piece ) + 1
pieces_to_request = min ( min ( max ( 2 , len ( self . piece_consumption_time ) ) , max_piece_count ) , MAX_PIECE_REQUEST_COUNT )
2018-08-10 19:59:26 +02:00
2020-04-21 21:49:31 +02:00
logger . debug ( ' New piece request status pieces_to_request: %s piece_consumption_time: %s max_piece_count: %s ' % ( pieces_to_request , len ( self . piece_consumption_time ) , max_piece_count , ) )
logger . debug ( ' Requested pieces: %r ' % ( self . requested_pieces . items ( ) ) )
logger . debug ( ' Piece buffer: %r ' % ( self . piece_buffer . keys ( ) ) )
2018-08-25 17:53:55 +02:00
2020-04-21 21:49:31 +02:00
for piece in range ( current_piece , current_piece + pieces_to_request ) :
if piece in self . requested_pieces :
continue
2018-08-10 19:59:26 +02:00
2020-04-21 21:49:31 +02:00
logger . debug ( ' Requesting piece %s ' % ( piece , ) )
self . requested_pieces [ piece ] = threading . Event ( )
self . torrent . request_piece ( piece )
for _ in range ( 1000 ) :
if self . requested_pieces [ current_piece ] . wait ( 1 ) :
break
if self . _closed :
return b ' '
else :
2018-08-10 19:59:26 +02:00
return b ' '
2020-04-21 21:49:31 +02:00
for delete_piece in [ p for p in self . piece_buffer . keys ( ) if p < current_piece ] :
del self . piece_buffer [ delete_piece ]
for delete_piece in [ p for p in self . requested_pieces . keys ( ) if p < current_piece ] :
del self . requested_pieces [ delete_piece ]
self . current_piece_data = self . piece_buffer [ current_piece ]
self . current_piece_data . seek ( rest )
self . piece_consumption_time . append ( time . time ( ) )
logger . debug ( ' Returning %s bytes ' % ( num , ) )
return self . _read ( num )
@property
def current_piece ( self ) :
from_byte = self . offset + self . tell ( )
piece_length = self . torrent . piece_length
piece , rest = divmod ( from_byte , piece_length )
return piece , rest
def new_piece_available ( self , piece , data ) :
if piece not in self . requested_pieces or self . requested_pieces [ piece ] . is_set ( ) :
return
logger . debug ( " Setting data for piece %s " % ( piece , ) )
self . piece_buffer [ piece ] = BytesIO ( data )
self . requested_pieces [ piece ] . set ( )
2018-08-10 19:59:26 +02:00
def close ( self ) :
self . torrent . remove_reader ( self )
2020-04-21 21:49:31 +02:00
self . _closed = True