Blob Blame History Raw
/*
 * lftp - file transfer program
 *
 * Copyright (c) 1996-2016 by Alexander V. Lukyanov (lav@yars.free.net)
 *
 * This program is free software; you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation; either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */

#ifndef TORRENT_H
#define TORRENT_H

#include "FileAccess.h"
#include "Bencode.h"
#include "Error.h"
#include "ProtoLog.h"
#include "network.h"
#include "RateLimit.h"
#include "Resolver.h"
#include "FileCopy.h"
#include "DHT.h"

class FDCache;
class TorrentBlackList;
class Torrent;
class TorrentPeer;

class BitField : public xarray<unsigned char>
{
   int bit_length;
public:
   BitField() { bit_length=0; }
   BitField(int bits);
   bool valid_index(int i) const {
      return i>=0 && i<bit_length;
   }
   bool get_bit(int i) const;
   void set_bit(int i,bool value);
   bool has_any_set(int from,int to) const;
   bool has_all_set(int from,int to) const;
   bool has_any_set() const { return has_any_set(0,bit_length); }
   bool has_all_set() const { return has_all_set(0,bit_length); }
   int get_bit_length() const { return bit_length; }
   void set_bit_length(int b) { bit_length=b; set_length((b+7)/8); }
   void clear() { memset(buf,0,length()); }
   void set_range(int from,int to,bool value);
};

class TorrentBuild : public SMTask, public ProtoLog
{
   xstring_c top_path;
   xstring name;
   FileSet files;
   StringSet dirs_to_scan;
   bool done;
   Ref<Error> error;

   Ref<DirectedBuffer> translate;
   const char *lc_to_utf8(const char *s);

   friend class Torrent;
   Ref<BeNode> info;
   xstring pieces;
   off_t total_length;
   unsigned piece_length;

   const char *CurrPath() const { return dirs_to_scan[0]; }
   void NextDir() { dirs_to_scan.Remove(0); }
   void QueueDir(const char *dir) { dirs_to_scan.Append(dir); }
   void AddFile(const char *path,struct stat *st);
   void Finish();

public:
   TorrentBuild(const char *top);
   int Do();
   bool Done() const { return done || error; }
   bool Failed() const { return error; }
   const char *ErrorText() const { return error->Text(); }
   BeNode *GetFilesNode() const { return info->lookup("files"); }
   void SetPiece(unsigned p,const xstring& sha1);
   const xstring& GetMetadata();
   const char *GetBaseDirectory() const { return dirname(top_path); }
   const xstring& Status() const;
};

class TorrentPiece
{
   unsigned sources_count;	    // how many peers have the piece
   unsigned downloader_count;	    // how many downloaders of the piece are there
   float ratio;
   RefToArray<const TorrentPeer*> downloader; // which peers download the blocks
   Ref<BitField> block_map;	    // which blocks are present.

public:
   TorrentPiece() : sources_count(0), downloader_count(0), ratio(0) {}
   ~TorrentPiece() {}

   unsigned get_sources_count() const { return sources_count; }
   void add_sources_count(int diff) { sources_count+=diff; }
   bool has_no_sources() const { return sources_count==0; }

   bool has_a_downloader() const { return downloader_count>0; }
   void set_downloader(unsigned block,const TorrentPeer *o,const TorrentPeer *n,unsigned blk_count) {
      if(!downloader) {
	 if(o || !n)
	    return;
	 downloader=new const TorrentPeer*[blk_count];
	 for(unsigned i=0; i<blk_count; i++)
	    downloader[i]=0;
      }
      const TorrentPeer*& d=downloader[block];
      if(d==o) {
	 d=n;
	 downloader_count+=(n!=0)-(o!=0);
      }
   }
   void cleanup() {
      if(downloader_count==0 && downloader)
	 downloader=0;
   }
   const TorrentPeer *downloader_for(unsigned block) {
      return downloader ? downloader[block] : 0;
   }

   void set_block_present(unsigned block,unsigned blk_count) {
      if(!block_map)
	 block_map=new BitField(blk_count);
      block_map->set_bit(block,1);
   }
   void set_blocks_absent() {
      block_map=0;
   }
   void free_block_map() {
      block_map=0;
   }
   bool block_present(unsigned block) const {
      return block_map && block_map->get_bit(block);
   }
   bool all_blocks_present(unsigned blk_count) const {
      return block_map && block_map->has_all_set(0,blk_count);
   }
   bool any_blocks_present() const {
      return block_map; // it's allocated when setting any bit
   }

   float get_ratio() const { return ratio; }
   void add_ratio(float add) { ratio+=add; }
};

struct TorrentFile
{
   char *path;
   off_t pos;
   off_t length;
   void set(const char *n,off_t p,off_t l) {
      path=xstrdup(n);
      pos=p;
      length=l;
   }
   void unset() {
      xfree(path); path=0;
   }
   bool contains_pos(off_t p) const {
      return p>=pos && p<pos+length;
   }
};

class TorrentFiles : public xarray<TorrentFile>
{
   static int pos_cmp(const TorrentFile *a, const TorrentFile *b) {
      if(a->pos < b->pos)
	 return -1;
      if(a->pos > b->pos)
	 return 1;
      // we want zero-sized files to placed before non-zero ones.
      if(a->length != b->length)
	 return a->length < b->length ? -1 : 1;
      return 0;
   }
public:
   TorrentFile *file(int i) { return get_non_const()+i; }
   TorrentFiles(const BeNode *f_node,const Torrent *t);
   ~TorrentFiles() {
      for(int i=0; i<length(); i++)
	 file(i)->unset();
   }
   TorrentFile *FindByPosition(off_t p);
};

class TorrentListener : public SMTask, protected ProtoLog, protected Networker
{
   Ref<Error> error;
   int af;
   int type;
   int sock;
   sockaddr_u addr;
   Speedometer rate;
   void FillAddress(int port);
   Time last_sent_udp;
   int  last_sent_udp_count;
public:
   TorrentListener(int a,int type=SOCK_STREAM);
   ~TorrentListener();
   int Do();
   int GetPort() const { return addr.port(); }
   const char *GetAddress() const { return addr.address(); }
   const char *GetLogContext() { return type==SOCK_DGRAM?(af==AF_INET?"torrent(udp)":"torrent(udp6)"):"torrent"; }

   int SendUDP(const sockaddr_u& a,const xstring& buf);
   bool MaySendUDP();
};

class TorrentTracker;

class Torrent : public SMTask, protected ProtoLog, public ResClient
{
   friend class TorrentPeer;
   friend class TorrentDispatcher;
   friend class TorrentListener;
   friend class TorrentFiles;
   friend class DHT;

   bool shutting_down;
   bool complete;
   bool end_game;
   bool is_private;
   bool validating;
   bool force_valid;
   bool build_md;
   bool stop_if_complete;
   bool stop_if_known;
   bool md_saved;
   unsigned validate_index;
   Ref<Error> invalid_cause;

   static const unsigned PEER_ID_LEN = 20;
   static xstring my_peer_id;
   static xstring my_key;
   static unsigned my_key_num;
   static xmap<Torrent*> torrents;
   static SMTaskRef<TorrentListener> listener;
   static SMTaskRef<TorrentListener> listener_udp;
   static SMTaskRef<DHT> dht;
#if INET6
   static SMTaskRef<TorrentListener> listener_ipv6;
   static SMTaskRef<TorrentListener> listener_ipv6_udp;
   static SMTaskRef<DHT> dht_ipv6;
#endif
   static SMTaskRef<FDCache> fd_cache;
   static Ref<TorrentBlackList> black_list;

   static const SMTaskRef<DHT>& GetDHT(int af)
   {
#if INET6
      if(af==AF_INET6 && dht_ipv6)
	 return dht_ipv6;
#endif
      return dht;
   }
   static const SMTaskRef<DHT>& GetDHT(const sockaddr_u& a) { return GetDHT(a.family()); }
   static const SMTaskRef<TorrentListener>& GetUDPSocket(int af)
   {
#if INET6
      if(af==AF_INET6)
	 return listener_ipv6_udp;
#endif
      return listener_udp;
   }
   static const SMTaskRef<TorrentListener>& GetUDPSocket(const sockaddr_u& a) { return GetUDPSocket(a.family()); }

   static Torrent *FindTorrent(const xstring& info_hash) { return torrents.lookup(info_hash); }
   static void AddTorrent(Torrent *t);
   static void RemoveTorrent(Torrent *t);
   static int GetTorrentsCount() { return torrents.count(); }
   static void Dispatch(const xstring& info_hash,int s,const sockaddr_u *remote_addr,IOBuffer *recv_buf);
   static void DispatchUDP(const char *buf,int len,const sockaddr_u& src);

   xstring md_download;
   size_t metadata_size;
   void FetchMetadataFromURL(const char *url);
   void StartMetadataDownload();
   void MetadataDownloaded();
   bool SetMetadata(const xstring& md);
   void ParseMagnet(const char *p);
   const char *GetMetadataPath() const;
   bool SaveMetadata() const;
   bool LoadMetadata(const char *path);

   void Startup();

   void SetTotalLength(off_t);
   void StartValidating();

   xstring_c metainfo_url;
   SMTaskRef<FileCopy> metainfo_copy;
   SMTaskRef<TorrentBuild> building;
   Ref<BeNode> metainfo_tree;
   BeNode *info;
   xstring metadata;
   xstring info_hash;
   const xstring *pieces;
   xstring name;
   Ref<TorrentFiles> files;

   Ref<DirectedBuffer> recv_translate;
   Ref<DirectedBuffer> recv_translate_utf8;
   void InitTranslation();
   void TranslateString(BeNode *node) const;
   void TranslateStringFromUTF8(BeNode *node) const;

   TaskRefArray<TorrentTracker> trackers;
   bool TrackersDone() const;
   void StartTrackers();
   void ShutdownTrackers() const;
   void SendTrackersRequest(const char *e) const;
   static void StartListener();
   static void StartListenerUDP();
   static void StopListener();
   static void StopListenerUDP();
   static void StartDHT();
   static void StopDHT();
   void AnnounceDHT();
   void DenounceDHT();

   unsigned piece_length;
   unsigned last_piece_length;
   unsigned total_pieces;
   unsigned complete_pieces;
   Ref<BitField> my_bitfield;

   static const unsigned BLOCK_SIZE = 0x4000;

   unsigned long long total_length;
   unsigned long long total_recv;
   unsigned long long total_sent;
   unsigned long long total_left;

   void AccountSend(unsigned p,unsigned len);
   void AccountRecv(unsigned p,unsigned len);

   void SetError(Error *);
   void SetError(const char *);

   BeNode *Lookup(xmap_p<BeNode>& d,const char *name,BeNode::be_type_t type);
   BeNode *Lookup(BeNode *d,const char *name,BeNode::be_type_t type) { return Lookup(d->dict,name,type); }
   BeNode *Lookup(Ref<BeNode>& d,const char *name,BeNode::be_type_t type) { return Lookup(d->dict,name,type); }

   TaskRefArray<TorrentPeer> peers;
   static int PeersCompareActivity(const SMTaskRef<TorrentPeer> *p1,const SMTaskRef<TorrentPeer> *p2);
   static int PeersCompareRecvRate(const SMTaskRef<TorrentPeer> *p1,const SMTaskRef<TorrentPeer> *p2);
   static int PeersCompareSendRate(const SMTaskRef<TorrentPeer> *p1,const SMTaskRef<TorrentPeer> *p2);

   RefToArray<TorrentPiece> piece_info;
   unsigned blocks_in_piece;
   unsigned blocks_in_last_piece;
   bool BlockPresent(unsigned piece,unsigned block) const {
      return piece_info[piece].block_present(block);
   }
   bool AllBlocksPresent(unsigned piece) const {
      return piece_info[piece].all_blocks_present(BlocksInPiece(piece));
   }
   bool AnyBlocksPresent(unsigned piece) const {
      return piece_info[piece].any_blocks_present();
   }
   bool AllBlocksAbsent(unsigned piece) const {
      return !AnyBlocksPresent(piece);
   }
   void SetBlocksAbsent(unsigned piece) {
      piece_info[piece].set_blocks_absent();
   }
   void SetBlockPresent(unsigned piece,unsigned block) {
      piece_info[piece].set_block_present(block,BlocksInPiece(piece));
   }

   void RebuildPiecesNeeded();
   Timer pieces_timer; // for periodic pieces scanning
   xarray<unsigned> pieces_needed;
   static int PiecesNeededCmp(const unsigned *a,const unsigned *b);
   unsigned last_piece;

   unsigned min_piece_sources;
   unsigned avg_piece_sources;
   unsigned pieces_available_pct;
   float current_min_ppr;
   float current_max_ppr;

   void SetPieceNotWanted(unsigned piece);
   void SetDownloader(unsigned piece,unsigned block,const TorrentPeer *o,const TorrentPeer *n);

   xstring_c cwd;
   xstring_c output_dir;

   const char *FindFileByPosition(unsigned piece,unsigned begin,off_t *f_pos,off_t *f_tail) const;
   const char *MakePath(BeNode *p) const;
   int OpenFile(const char *f,int m,off_t size=0);
   void CloseFile(const char *f) const;

   void StoreBlock(unsigned piece,unsigned begin,unsigned len,const char *buf,TorrentPeer *src_peer);
   const xstring& RetrieveBlock(unsigned piece,unsigned begin,unsigned len);

   Speedometer recv_rate;
   Speedometer send_rate;

   RateLimit rate_limit;
   bool RateLow(RateLimit::dir_t dir) { return rate_limit.Relaxed(dir); }

   int connected_peers_count;
   int active_peers_count;
   int complete_peers_count;
   int am_interested_peers_count;
   int am_not_choking_peers_count;
   int max_peers;
   int seed_min_peers;

   bool SeededEnough() const;
   float stop_on_ratio;
   float stop_min_ppr;
   Timer seed_timer;
   Timer timeout_timer;

   Timer decline_timer;
   Timer optimistic_unchoke_timer;
   Timer peers_scan_timer;
   Timer am_interested_timer;
   Timer shutting_down_timer;

   Timer dht_announce_timer;
   int dht_announce_count;
   int dht_announce_count_ipv6;

   static const int max_uploaders = 20;
   static const int min_uploaders = 1;
   static const int max_downloaders = 20;
   static const int min_downloaders = 4;

   bool NeedMoreUploaders();
   bool AllowMoreDownloaders();
   void UnchokeBestUploaders();
   void ScanPeers();
   void OptimisticUnchoke();
   void ReducePeers();
   void ReduceUploaders();
   void ReduceDownloaders();

   int PeerBytesAllowed(const TorrentPeer *peer,RateLimit::dir_t dir);
   void PeerBytesUsed(int b,RateLimit::dir_t dir);
   void PeerBytesGot(int b) { PeerBytesUsed(b,RateLimit::GET); }

   static void BlackListPeer(const TorrentPeer *peer,const char *timeout);
   static bool BlackListed(const TorrentPeer *peer);
   TorrentPeer *FindPeerById(const xstring& p_id);

public:
   static void ClassInit();

   Torrent(const char *mf,const char *cwd,const char *output_dir);
   ~Torrent();

   int Do();
   int Done() const;

   const xstring& Status();

   const Error *GetInvalidCause() const { return invalid_cause; }

   void Shutdown();
   bool ShuttingDown() const { return shutting_down; }
   void PrepareToDie();

   bool CanAccept() const;
   void Accept(int s,const sockaddr_u *a,IOBuffer *rb);
   static bool NoTorrentCanAccept();

   static void SHA1(const xstring& str,xstring& buf);
   void ValidatePiece(unsigned p);
   unsigned PieceLength(unsigned p) const { return p==total_pieces-1 ? last_piece_length : piece_length; }
   unsigned BlocksInPiece(unsigned p) const { return p==total_pieces-1 ? blocks_in_last_piece : blocks_in_piece; }

   const TaskRefArray<TorrentPeer>& GetPeers() const { return peers; }
   void AddPeer(TorrentPeer *);
   void CleanPeers();

   const xstring& GetInfoHash() const { return info_hash; }
   int GetPeersCount() const { return peers.count(); }
   int GetConnectedPeersCount() const { return connected_peers_count; }
   int GetActivePeersCount() const { return active_peers_count; }
   int GetCompletePeersCount() const { return complete_peers_count; }

   bool Complete() const { return complete; }
   bool Private() const { return is_private; }
   double GetRatio() const;
   void CalcPiecesStats();
   void CalcPerPieceRatio();
   float GetMinPerPieceRatio() const { return current_min_ppr; }
   float GetMaxPerPieceRatio() const { return current_max_ppr; }
   unsigned MinPieceSources() const { return min_piece_sources; }
   double AvgPieceSources() const { return avg_piece_sources/256.; }
   unsigned PiecesAvailablePct() const { return pieces_available_pct; }
   unsigned long long TotalLength() const { return total_length; }
   unsigned PieceLength() const { return piece_length; }
   const char *GetName() const { return name?name.get():metainfo_url.get(); }
   bool IsDownloading() const { return HasMetadata() && !IsValidating() && !Complete() && !ShuttingDown(); }

   void Reconfig(const char *name);
   const char *GetLogContext() { return GetName(); }

   void ForceValid() { force_valid=true; }
   bool IsValidating() const { return validating; }
   void Share() { build_md=true; }
   bool IsSharing() const { return build_md; }

   void StopIfComplete() { stop_if_complete=true; }
   void StopIfKnown() { stop_if_known=stop_if_complete=true; }

   static int GetPort();
   static int GetPortIPv4() { return listener?listener->GetPort():0; }
#if INET6
   static int GetPortIPv6() { return listener_ipv6?listener_ipv6->GetPort():0; }
   static const char *GetAddressIPv6() { return listener_ipv6?listener_ipv6->GetAddress():"::"; }
#endif
   int GetWantedPeersCount() const;
   static const xstring& GetMyPeerId() { return my_peer_id; }
   static const xstring& GetMyKey() { return my_key; }
   static unsigned GetMyKeyNum() { return my_key_num; }

   unsigned long long GetTotalSent() { return total_sent; }
   unsigned long long GetTotalRecv() { return total_recv; }
   unsigned long long GetTotalLeft() { return total_left; }

   const TaskRefArray<TorrentTracker>& Trackers() { return trackers; }
   bool HasMetadata() const { return metadata!=0; }
   void RestartPeers();

   static void BootstrapDHT(const char *n) {
      StartDHT();
      if(dht)
	 dht->AddBootstrapNode(n);
   }

   static bool HasDHT() {
#if INET6
      if(dht_ipv6)
	 return true;
#endif
      if(dht)
	 return true;
      return false;
   }
   static bool HasDHT(int af) {
#if INET6
      if(af==AF_INET6 && dht_ipv6)
	 return true;
#endif
      if(af==AF_INET && dht)
	 return true;
      return false;
   }
   const char *DHT_Status() const;
   void DHT_Announced(int af);	 // called from DHT to count announces
};

class FDCache : public SMTask, public ResClient
{
   struct FD
   {
      int fd;
      int saved_errno;
      time_t last_used;
   };
   int max_count;
   int max_time;
   xmap<FD> cache[3];
   Timer clean_timer;

public:
   int OpenFile(const char *name,int mode,off_t size=0);
   void Close(const char *name);
   int Count() const;
   void Clean();
   bool CloseOne();
   void CloseAll();
   FDCache();
   ~FDCache();

   int Do();
};

class TorrentPeer : public SMTask, protected ProtoLog, public Networker
{
   friend class Torrent;

   Ref<Error> error;
   Torrent *parent;

   int tracker_no;

   sockaddr_u addr;
   int sock;
   int udp_port;
   bool connected;
   bool passive;

   xstring_c last_dc;

   Timer timeout_timer;
   Timer retry_timer;
   Timer keepalive_timer;
   Timer choke_timer;
   Timer interest_timer;
   Timer activity_timer;

   SMTaskRef<IOBuffer> recv_buf;
   SMTaskRef<IOBuffer> send_buf;

   unsigned long long peer_recv;
   unsigned long long peer_sent;

   Speedometer peer_recv_rate;
   Speedometer peer_send_rate;

   xstring peer_id;
   unsigned char extensions[8];
   TorrentPeer *duplicate;
   bool myself;

   bool FastExtensionEnabled() const { return extensions[7]&0x04; }
   bool LTEPExtensionEnabled() const { return extensions[5]&0x10; }
   bool DHT_Enabled() const { return extensions[7]&0x01; }

   bool am_choking;
   bool am_interested;
   bool peer_choking;
   bool peer_interested;

   bool upload_only;

   Ref<BitField> peer_bitfield;
   unsigned peer_complete_pieces;

   xqueue<unsigned,xarray<unsigned> > fast_set;
   bool InFastSet(unsigned) const;
   xqueue<unsigned,xarray<unsigned> > suggested_set;

   enum packet_type
   {
      MSG_KEEPALIVE=-1,
      MSG_CHOKE=0,
      MSG_UNCHOKE=1,
      MSG_INTERESTED=2,
      MSG_UNINTERESTED=3,
      MSG_HAVE=4,
      MSG_BITFIELD=5,
      MSG_REQUEST=6,
      MSG_PIECE=7,
      MSG_CANCEL=8,
      MSG_PORT=9,
      MSG_SUGGEST_PIECE=13,
      MSG_HAVE_ALL=14,
      MSG_HAVE_NONE=15,
      MSG_REJECT_REQUEST=16,
      MSG_ALLOWED_FAST=17,
      MSG_EXTENDED=20,
   };
   enum msg_ext_id
   {
      MSG_EXT_HANDSHAKE=0,
      MSG_EXT_PEX=1,
      MSG_EXT_METADATA=2,
   };
   enum ut_metadata_msg_id
   {
      UT_METADATA_REQUEST=0,
      UT_METADATA_DATA=1,
      UT_METADATA_REJECT=2,
   };
public:
   enum { TR_ACCEPTED=-1, TR_DHT=-2, TR_PEX=-3 };  // special values for tracker_no
   enum unpack_status_t
   {
      UNPACK_SUCCESS=0,
      UNPACK_WRONG_FORMAT=-1,
      UNPACK_PREMATURE_EOF=-2,
      UNPACK_NO_DATA_YET=1
   };
   class Packet
   {
      static bool is_valid_reply(int p)
      {
	 return (p>=0 && p<=MSG_PORT)
	    || (p>=MSG_SUGGEST_PIECE && p<=MSG_ALLOWED_FAST)
	    || p==MSG_EXTENDED;
      }
   protected:
      int length;
      int unpacked;
      packet_type type;
   public:
      Packet(packet_type t);
      Packet() { length=0; }
      virtual void ComputeLength() { length=(type>=0); }
      virtual void Pack(SMTaskRef<IOBuffer>& b);
      virtual unpack_status_t Unpack(const Buffer *b);
      virtual ~Packet() {}
      int GetLength() const { return length; }
      packet_type GetPacketType() const { return type; }
      const char *GetPacketTypeText() const;
      void DropData(SMTaskRef<IOBuffer>& b) { b->Skip(4+length); }
      bool TypeIs(packet_type t) const { return type==t; }
      static unpack_status_t UnpackBencoded(const Buffer *b,int *offset,int limit,Ref<BeNode> *out);
   };
   class _PacketPiece : public Packet
   {
   public:
      unsigned piece;
      _PacketPiece(packet_type t,unsigned p) : Packet(t), piece(p) { length+=4; }
      unpack_status_t Unpack(const Buffer *b)
	 {
	    unpack_status_t res;
	    res=Packet::Unpack(b);
	    if(res!=UNPACK_SUCCESS)
	       return res;
	    piece=b->UnpackUINT32BE(unpacked);
	    unpacked+=4;
	    return UNPACK_SUCCESS;
	 }
      void ComputeLength() { Packet::ComputeLength(); length+=4; }
      void Pack(SMTaskRef<IOBuffer>& b) { Packet::Pack(b); b->PackUINT32BE(piece); }
   };
   class PacketHave : public _PacketPiece {
   public:
      PacketHave(unsigned p=0) : _PacketPiece(MSG_HAVE,p) {}
   };
   class PacketBitField : public Packet
   {
   public:
      Ref<BitField> bitfield;
      PacketBitField() : Packet(MSG_BITFIELD) {}
      PacketBitField(const BitField *bf);
      ~PacketBitField();
      unpack_status_t Unpack(const Buffer *b);
      void ComputeLength();
      void Pack(SMTaskRef<IOBuffer>& b);
   };
   class _PacketIBL : public Packet
   {
   public:
      unsigned index,begin,req_length;
      _PacketIBL(packet_type t,unsigned i,unsigned b,unsigned l);
      unpack_status_t Unpack(const Buffer *b);
      void ComputeLength();
      void Pack(SMTaskRef<IOBuffer>& b);
   };
   class PacketRequest : public _PacketIBL
   {
   public:
      PacketRequest(unsigned i=0,unsigned b=0,unsigned l=0)
	 : _PacketIBL(MSG_REQUEST,i,b,l) {}
   };
   class PacketCancel : public _PacketIBL {
   public:
      PacketCancel(unsigned i=0,unsigned b=0,unsigned l=0)
	 : _PacketIBL(MSG_CANCEL,i,b,l) {}
   };
   class PacketPiece : public Packet
   {
   public:
      unsigned index,begin;
      xstring data;
      PacketPiece() : Packet(MSG_PIECE), index(0), begin(0) {}
      PacketPiece(unsigned i,unsigned b,const xstring &s)
	 : Packet(MSG_PIECE), index(i), begin(b) { data.set(s); length+=8+data.length(); }
      unpack_status_t Unpack(const Buffer *b)
	 {
	    unpack_status_t res;
	    res=Packet::Unpack(b);
	    if(res!=UNPACK_SUCCESS)
	       return res;
	    index=b->UnpackUINT32BE(unpacked);unpacked+=4;
	    begin=b->UnpackUINT32BE(unpacked);unpacked+=4;
	    unsigned bytes=length+4-unpacked;
	    data.nset(b->Get()+unpacked,bytes);
	    unpacked+=bytes;
	    return UNPACK_SUCCESS;
	 }
      void ComputeLength() { Packet::ComputeLength(); length+=8+data.length(); }
      void Pack(SMTaskRef<IOBuffer>& b) {
	 Packet::Pack(b);
	 b->PackUINT32BE(index);
	 b->PackUINT32BE(begin);
	 b->Put(data);
      }
   };
   class PacketPort : public Packet
   {
   public:
      unsigned port;
      PacketPort(unsigned p=0) : Packet(MSG_PORT), port(p) { length+=2; }
      unpack_status_t Unpack(const Buffer *b)
	 {
	    unpack_status_t res;
	    res=Packet::Unpack(b);
	    if(res!=UNPACK_SUCCESS)
	       return res;
	    port=b->UnpackUINT16BE(unpacked);
	    unpacked+=2;
	    return UNPACK_SUCCESS;
	 }
      void ComputeLength() { Packet::ComputeLength(); length+=2; }
      void Pack(SMTaskRef<IOBuffer>& b) { Packet::Pack(b); b->PackUINT16BE(port); }
   };
   class PacketSuggestPiece : public _PacketPiece {
   public:
      PacketSuggestPiece(unsigned p=0) : _PacketPiece(MSG_SUGGEST_PIECE,p) {}
   };
   class PacketAllowedFast : public _PacketPiece {
   public:
      PacketAllowedFast(unsigned p=0) : _PacketPiece(MSG_ALLOWED_FAST,p) {}
   };
   class PacketRejectRequest : public _PacketIBL {
   public:
      PacketRejectRequest(unsigned i=0,unsigned b=0,unsigned l=0)
	 : _PacketIBL(MSG_REJECT_REQUEST,i,b,l) {}
   };
   class PacketExtended : public Packet
   {
   public:
      unsigned char code;
      Ref<BeNode> data;
      xstring appendix;
      PacketExtended(unsigned char c='\0',BeNode *d=0)
	 : Packet(MSG_EXTENDED), code(c), data(d) { length++; if(data) length+=data->ComputeLength(); }
      unpack_status_t Unpack(const Buffer *b)
	 {
	    unpack_status_t res;
	    res=Packet::Unpack(b);
	    if(res!=UNPACK_SUCCESS)
	       return res;
	    code=b->UnpackUINT8(unpacked); unpacked++;
	    res=UnpackBencoded(b,&unpacked,length+4,&data);
	    if(unpacked<length+4) {
	       appendix.nset(b->Get()+unpacked,length+4-unpacked);
	       unpacked=length+4;
	    }
	    return res;
	 }
      void ComputeLength() { Packet::ComputeLength(); length++; if(data) length+=data->ComputeLength(); length+=appendix.length(); }
      void Pack(SMTaskRef<IOBuffer>& b) { Packet::Pack(b); b->PackUINT8(code); if(data) data->Pack(b); b->Put(appendix); }
      void SetAppendix(const char *s,int len) { appendix.nset(s,len); length+=len; }
   };

private:
   unpack_status_t UnpackPacket(SMTaskRef<IOBuffer>& ,Packet **);
   void HandlePacket(Packet *);
   void HandleExtendedMessage(PacketExtended *);

   static const int MAX_QUEUE_LEN = 16;
   RefQueue<PacketRequest> recv_queue;
   RefQueue<PacketRequest> sent_queue;

   unsigned last_piece;
   static const unsigned NO_PIECE = ~0U;
   void SetLastPiece(unsigned p);
   unsigned GetLastPiece() const;
   bool HasNeededPieces();
   void SetPieceHaving(unsigned p,bool have);
   void SetAmInterested(bool);
   void SetAmChoking(bool);

   void ClearSentQueue(int i);
   void ClearSentQueue() { ClearSentQueue(sent_queue.count()-1); }

   int FindRequest(unsigned piece,unsigned begin) const;

   void SetError(const char *);
   void SendHandshake();
   unpack_status_t RecvHandshake();
   void SendExtensions();
   void Disconnect(const char *dc=0);
   void Restart();
   int SendDataRequests(unsigned p);
   void SendDataRequests();
   void Have(unsigned p);
   void SendDataReply();
   void CancelBlock(unsigned p,unsigned b);

   void MarkPieceInvalid(unsigned p);
   unsigned invalid_piece_count;

   int peer_bytes_pool[2];
   int BytesAllowed(RateLimit::dir_t dir);
   bool BytesAllowed(RateLimit::dir_t dir,unsigned bytes);
   bool BytesAllowedToGet(unsigned b) { return BytesAllowed(RateLimit::GET,b); }
   bool BytesAllowedToPut(unsigned b) { return BytesAllowed(RateLimit::PUT,b); }
   void BytesUsed(int bytes,RateLimit::dir_t dir);
   void BytesGot(int b) { BytesUsed(b,RateLimit::GET); }
   void BytesPut(int b) { BytesUsed(b,RateLimit::PUT); }

   int msg_ext_metadata;
   int msg_ext_pex;

   size_t metadata_size;
   void SendMetadataRequest();

   struct ut_pex_data
   {
      xmap<char> sent; // key is compact addr
      Timer send_timer;
      Timer recv_timer;
      enum flags { ENCRYPTION=1, SEED=2, UTP=4, HOLEPUNCH=8, CONNECTABLE=16 };
      ut_pex_data() : send_timer(60), recv_timer(59) {}
   } pex;
   void AddPEXPeers(BeNode *added,BeNode *added_f,int addr_size);
   void SendPEXPeers();

public:
   int Do();
   TorrentPeer(Torrent *p,const sockaddr_u *a,int tracker_no);
   ~TorrentPeer();
   void PrepareToDie();
   void Connect(int s,IOBuffer *rb);

   bool Failed() const { return error!=0; }
   const char *ErrorText() const { return error->Text(); }
   const char *GetName() const;
   const char *GetLogContext() { return GetName(); }

   bool ActivityTimedOut() const { return activity_timer.Stopped(); }
   bool NotConnected() const { return sock==-1; }
   bool Disconnected() const { return passive && NotConnected(); }
   bool Connected() const { return peer_id && send_buf && recv_buf; }
   bool Active() const { return Connected() && (am_interested || peer_interested); }
   bool Complete() const { return peer_complete_pieces==parent->total_pieces && parent->total_pieces>0; }
   bool Seed() const { return Complete() || upload_only; }
   bool AddressEq(const TorrentPeer *o) const;
   bool IsPassive() const { return passive; }
   const sockaddr_u& GetAddress() const { return addr; }

   const char *Status();
};

class TorrentBlackList : private ProtoLog
{
   xmap_p<Timer> bl;
   void check_expire();
public:
   bool Listed(const sockaddr_u &a);
   void Add(const sockaddr_u &a,const char *t="1h");
};

class TorrentDispatcher : public SMTask, protected ProtoLog
{
   int sock;
   const sockaddr_u addr;
   SMTaskRef<IOBuffer> recv_buf;
   Timer timeout_timer;
   xstring_c peer_name;
public:
   TorrentDispatcher(int s,const sockaddr_u *a);
   ~TorrentDispatcher();
   int Do();
   const char *GetLogContext() { return peer_name; }
};

#include "Job.h"

class TorrentJob : public Job
{
   SMTaskRef<Torrent> torrent;
   bool completed;
   bool done;
public:
   TorrentJob(Torrent *);
   ~TorrentJob();
   int Do();
   int Done() { return done; }
   xstring& FormatStatus(xstring&,int v,const char *tab);
   void ShowRunStatus(const SMTaskRef<StatusLine>& s);
   int AcceptSig(int);
   void PrepareToDie();
};

#endif//TORRENT_H