Blob Blame History Raw
/*
 * lftp - file transfer program
 *
 * Copyright (c) 1996-2016 by Alexander V. Lukyanov (lav@yars.free.net)
 *
 * This program is free software; you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation; either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */

#include <config.h>
#include <stdlib.h>
#include <assert.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <fcntl.h>
#include <errno.h>
#include <sha1.h>
#include <dirent.h>

#include "Torrent.h"
#include "TorrentTracker.h"
#include "DHT.h"
#include "log.h"
#include "url.h"
#include "misc.h"
#include "plural.h"
CDECL_BEGIN
#include "human.h"
CDECL_END

static ResType torrent_vars[] = {
   {"torrent:port-range", "6881-6889", ResMgr::RangeValidate, ResMgr::NoClosure},
   {"torrent:max-peers", "60", ResMgr::UNumberValidate},
   {"torrent:save-metadata", "yes", ResMgr::BoolValidate, ResMgr::NoClosure},
   {"torrent:stop-min-ppr", "1.4", ResMgr::FloatValidate},
   {"torrent:stop-on-ratio", "2.0", ResMgr::FloatValidate},
   {"torrent:seed-max-time", "30d", ResMgr::TimeIntervalValidate},
   {"torrent:seed-min-peers", "3", ResMgr::UNumberValidate},
   {"torrent:ip", "", ResMgr::IPv4AddrValidate, ResMgr::NoClosure},
   {"torrent:retracker", ""},
   {"torrent:use-dht", "yes", ResMgr::BoolValidate, ResMgr::NoClosure},
   {"torrent:timeout", "7d", ResMgr::TimeIntervalValidate, ResMgr::NoClosure},
#if INET6
   {"torrent:ipv6", "", ResMgr::IPv6AddrValidate, ResMgr::NoClosure},
#endif
   {0}
};
static ResDecls torrent_vars_register(torrent_vars);

#if INET6
#ifdef HAVE_IFADDRS_H
#include <ifaddrs.h>
#endif
static const char *FindGlobalIPv6Address()
{
#ifdef HAVE_IFADDRS_H
   struct ifaddrs *ifaddrs=0;
   getifaddrs(&ifaddrs);
   for(struct ifaddrs *ifa=ifaddrs; ifa; ifa=ifa->ifa_next) {
      if(ifa->ifa_addr && ifa->ifa_addr->sa_family==AF_INET6) {
	 struct in6_addr *addr=&((struct sockaddr_in6*)ifa->ifa_addr)->sin6_addr;
	 if(!IN6_IS_ADDR_UNSPECIFIED(addr) && !IN6_IS_ADDR_LOOPBACK(addr)
	 && !IN6_IS_ADDR_LINKLOCAL(addr) && !IN6_IS_ADDR_SITELOCAL(addr)
	 && !IN6_IS_ADDR_MULTICAST(addr)) {
	    char *buf=xstring::tmp_buf(INET6_ADDRSTRLEN);
	    inet_ntop(AF_INET6, addr, buf, INET6_ADDRSTRLEN);
	    freeifaddrs(ifaddrs);
	    return buf;
	 }
      }
   }
   freeifaddrs(ifaddrs);
#endif
   return 0;
}
#endif

void Torrent::ClassInit()
{
   static bool inited;
   if(inited)
      return;
   inited=true;

#if INET6
   const char *ipv6=ResMgr::Query("torrent:ipv6",0);
   if(!*ipv6)
   {
      ipv6=FindGlobalIPv6Address();
      if(ipv6) {
	 ProtoLog::LogNote(9,"found IPv6 address: %s",ipv6);
	 ResMgr::Set("torrent:ipv6",0,ipv6);
      }
   }
#endif
}

xstring Torrent::my_peer_id;
xstring Torrent::my_key;
unsigned Torrent::my_key_num;
xmap<Torrent*> Torrent::torrents;
SMTaskRef<TorrentListener> Torrent::listener;
SMTaskRef<TorrentListener> Torrent::listener_udp;
SMTaskRef<DHT> Torrent::dht;
#if INET6
SMTaskRef<TorrentListener> Torrent::listener_ipv6;
SMTaskRef<TorrentListener> Torrent::listener_ipv6_udp;
SMTaskRef<DHT> Torrent::dht_ipv6;
#endif
SMTaskRef<FDCache> Torrent::fd_cache;
Ref<TorrentBlackList> Torrent::black_list;

void Torrent::StartDHT()
{
   if(!ResMgr::QueryBool("torrent:use-dht",0)) {
      StopDHT();
      StopListenerUDP();
      return;
   }

   if(dht)
      return;

   StartListenerUDP();

   const char *home=get_lftp_cache_dir();
   const char *nodename=get_nodename();

   mkdir(xstring::format("%s/DHT",home),0700);

   const char *ip=ResMgr::Query("torrent:ip",0);
   if(!ip || !ip[0])
      ip="127.0.0.1";
   sockaddr_compact ip_packed;
   ip_packed.get_space(4);
   inet_pton(AF_INET,ip,ip_packed.get_non_const());
   ip_packed.set_length(4);
   xstring node_id;
   DHT::MakeNodeId(node_id,ip_packed);
   dht=new DHT(AF_INET,node_id);
   dht->state_file.setf("%s/DHT/ipv4-%s",home,nodename);
   if(listener_udp->GetPort())
      dht->Load();

#if INET6
   ip=ResMgr::Query("torrent:ipv6",0);
   if(!ip || !ip[0])
      ip="::1";
   ip_packed.get_space(16);
   inet_pton(AF_INET6,ip,ip_packed.get_non_const());
   ip_packed.set_length(16);
   DHT::MakeNodeId(node_id,ip_packed);
   dht_ipv6=new DHT(AF_INET6,node_id);
   dht_ipv6->state_file.setf("%s/DHT/ipv6-%s",home,nodename);
   if(listener_ipv6_udp->GetPort())
      dht_ipv6->Load();
#endif
}
void Torrent::StopDHT()
{
   if(!dht)
      return;
   dht->Save();
   dht=0;
#if INET6
   dht_ipv6->Save();
   dht_ipv6=0;
#endif
}

void Torrent::StartListener()
{
   if(listener)
      return;

   listener=new TorrentListener(AF_INET);
   listener->Roll(); // try to allocate ipv4 port first
#if INET6
   listener_ipv6=new TorrentListener(AF_INET6);
#endif
}
void Torrent::StopListener()
{
   listener=0;
#if INET6
   listener_ipv6=0;
#endif
}
void Torrent::StartListenerUDP()
{
   if(listener_udp)
      return;
   listener_udp=new TorrentListener(AF_INET,SOCK_DGRAM);
#if INET6
   listener_ipv6_udp=new TorrentListener(AF_INET6,SOCK_DGRAM);
#endif
}
void Torrent::StopListenerUDP()
{
   listener_udp=0;
#if INET6
   listener_ipv6_udp=0;
#endif
}

Torrent::Torrent(const char *mf,const char *c,const char *od)
   : metainfo_url(mf),
     pieces_timer(10),
     cwd(c), output_dir(od), rate_limit(mf),
     seed_timer("torrent:seed-max-time",0),
     timeout_timer("torrent:timeout",0),
     optimistic_unchoke_timer(30), peers_scan_timer(1),
     am_interested_timer(1), shutting_down_timer(60),
     dht_announce_timer(10*60),
     dht_announce_count(0), dht_announce_count_ipv6(0)
{
   shutting_down=false;
   complete=false;
   end_game=false;
   is_private=false;
   validating=false;
   force_valid=false;
   build_md=false;
   stop_if_complete=false;
   stop_if_known=false;
   md_saved=false;
   validate_index=0;
   metadata_size=0;
   info=0;
   pieces=0;
   piece_length=0;
   total_pieces=0;
   last_piece_length=0;
   total_length=0;
   total_sent=0;
   total_recv=0;
   total_left=0;
   complete_pieces=0;
   connected_peers_count=0;
   active_peers_count=0;
   complete_peers_count=0;
   am_interested_peers_count=0;
   am_not_choking_peers_count=0;
   max_peers=60;
   seed_min_peers=3;
   stop_on_ratio=2;
   stop_min_ppr=1,
   last_piece=TorrentPeer::NO_PIECE;
   min_piece_sources=0;
   avg_piece_sources=0;
   pieces_available_pct=0;
   current_min_ppr=0;
   current_max_ppr=0;
   Reconfig(0);

   if(!my_peer_id) {
      my_peer_id.set("-lftp47-");
      my_peer_id.appendf("%04x",(unsigned)getpid() & 0xffff);
      my_peer_id.appendf("%08x",(unsigned)now.UnixTime());
      assert(my_peer_id.length()==PEER_ID_LEN);
   }
   if(!my_key) {
      for(int i=0; i<10; i++)
	 my_key.appendf("%02x",unsigned(random()/13%256));
      my_key_num=random();
   }
   dht_announce_timer.Stop();
}

Torrent::~Torrent()
{
}

bool Torrent::TrackersDone() const
{
   if(shutting_down && shutting_down_timer.Stopped())
      return true;
   for(int i=0; i<trackers.count(); i++) {
      if(trackers[i]->IsActive())
	 return false;
   }
   return true;
}
int Torrent::Done() const
{
   return (shutting_down && TrackersDone());
}

void Torrent::ShutdownTrackers() const
{
   for(int i=0; i<trackers.count(); i++) {
      trackers[i]->Shutdown();
   }
}
void Torrent::Shutdown()
{
   if(shutting_down)
      return;
   Enter(this);
   LogNote(3,"Shutting down...");
   shutting_down=true;
   shutting_down_timer.Reset();
   ShutdownTrackers();
   DenounceDHT();
   PrepareToDie();
   Leave();
}

void Torrent::AddTorrent(Torrent *t)
{
   if(FindTorrent(t->GetInfoHash()))
      return;
   if(GetTorrentsCount()==0) {
      StartListener();
      StartDHT();
   }
   torrents.add(t->GetInfoHash(),t);
}
void Torrent::RemoveTorrent(Torrent *t)
{
   if(t!=FindTorrent(t->info_hash))
      return;
   torrents.remove(t->GetInfoHash());
   if(GetTorrentsCount()==0) {
      StopListener();
      StopDHT();
      StopListenerUDP();
      fd_cache=0;
      black_list=0;
   }
}

void Torrent::PrepareToDie()
{
   metainfo_copy=0;
   building=0;
   peers.unset();
   if(info_hash && this==FindTorrent(info_hash))
      RemoveTorrent(this);
}

void Torrent::SetError(Error *e)
{
   if(invalid_cause)
      return;
   invalid_cause=e;
   LogError(0,"%s: %s",
      invalid_cause->IsFatal()?"Fatal error":"Transient error",
      invalid_cause->Text());
   Shutdown();
}
void Torrent::SetError(const char *msg)
{
   SetError(Error::Fatal(msg));
}

double Torrent::GetRatio() const
{
   if(total_sent==0 || total_length==total_left)
      return 0;
   return total_sent/double(total_length-total_left);
}

void Torrent::SetDownloader(unsigned piece,unsigned block,const TorrentPeer *o,const TorrentPeer *n)
{
   piece_info[piece].set_downloader(block,o,n,BlocksInPiece(piece));
}

void Torrent::AccountSend(unsigned p,unsigned len)
{
   total_sent+=len;
   send_rate.Add(len);
   piece_info[p].add_ratio(float(len)/PieceLength(p));
}
void Torrent::AccountRecv(unsigned p,unsigned len)
{
   total_recv+=len;
   recv_rate.Add(len);
}

BeNode *Torrent::Lookup(xmap_p<BeNode>& dict,const char *name,BeNode::be_type_t type)
{
   BeNode *node=dict.lookup(name);
   if(!node) {
      SetError(xstring::format("Meta-data: `%s' key missing",name));
      return 0;
   }
   if(node->type!=type) {
      SetError(xstring::format("Meta-data: wrong `%s' type, must be %s",name,BeNode::TypeName(type)));
      return 0;
   }
   return node;
}
void Torrent::InitTranslation()
{
   const char *charset="UTF-8"; // default
   recv_translate_utf8=new DirectedBuffer(DirectedBuffer::GET);
   recv_translate_utf8->SetTranslation(charset,true);
   if(metainfo_tree) {
      BeNode *b_charset=metainfo_tree->lookup("encoding",BeNode::BE_STR);
      if(b_charset)
	 charset=b_charset->str;
   }
   recv_translate=new DirectedBuffer(DirectedBuffer::GET);
   recv_translate->SetTranslation(charset,true);
}
void Torrent::TranslateString(BeNode *node) const
{
   if(node->str_lc)
      return;
   recv_translate->ResetTranslation();
   recv_translate->PutTranslated(node->str);
   node->str_lc.nset(recv_translate->Get(),recv_translate->Size());
   recv_translate->Skip(recv_translate->Size());
}
void Torrent::TranslateStringFromUTF8(BeNode *node) const
{
   if(node->str_lc)
      return;
   const Ref<DirectedBuffer>& tr=recv_translate_utf8;
   tr->ResetTranslation();
   tr->PutTranslated(node->str);
   node->str_lc.nset(tr->Get(),tr->Size());
   tr->Skip(tr->Size());
}

void Torrent::SHA1(const xstring& str,xstring& buf)
{
   buf.get_space(SHA1_DIGEST_SIZE);
   sha1_buffer(str.get(),str.length(),buf.get_non_const());
   buf.set_length(SHA1_DIGEST_SIZE);
}

void Torrent::ValidatePiece(unsigned p)
{
   const xstring& buf=Torrent::RetrieveBlock(p,0,PieceLength(p));
   bool valid=false;
   if(buf.length()==PieceLength(p)) {
      xstring& sha1=xstring::get_tmp();
      SHA1(buf,sha1);
      if(building) {
	 building->SetPiece(p,sha1);
	 valid=true;
      } else {
	 valid=!memcmp(pieces->get()+p*SHA1_DIGEST_SIZE,sha1,SHA1_DIGEST_SIZE);
      }
   }
   if(!valid) {
      if(building) {
	 SetError("File validation error");
	 return;
      }
      if(buf.length()==PieceLength(p))
	 LogError(11,"piece %u digest mismatch",p);
      if(my_bitfield->get_bit(p)) {
	 total_left+=PieceLength(p);
	 complete_pieces--;
	 my_bitfield->set_bit(p,0);
      }
      SetBlocksAbsent(p);
   } else {
      LogNote(11,"piece %u ok",p);
      if(!my_bitfield->get_bit(p)) {
	 total_left-=PieceLength(p);
	 complete_pieces++;
	 my_bitfield->set_bit(p,1);
	 piece_info[p].free_block_map();
      }
   }
}

template<typename T>
static inline int cmp(T a,T b)
{
   if(a>b)
      return 1;
   if(a<b)
      return -1;
   return 0;
}

static Torrent *cmp_torrent;
int Torrent::PiecesNeededCmp(const unsigned *a,const unsigned *b)
{
   int ra=cmp_torrent->piece_info[*a].get_sources_count();
   int rb=cmp_torrent->piece_info[*b].get_sources_count();
   int c=cmp(ra,rb);
   if(c) return c;
   return cmp(*a,*b);
}
int Torrent::PeersCompareActivity(const SMTaskRef<TorrentPeer> *p1,const SMTaskRef<TorrentPeer> *p2)
{
   TimeDiff i1((*p1)->activity_timer.TimePassed());
   TimeDiff i2((*p2)->activity_timer.TimePassed());
   return cmp(i1.Seconds(),i2.Seconds());
}
int Torrent::PeersCompareRecvRate(const SMTaskRef<TorrentPeer> *p1,const SMTaskRef<TorrentPeer> *p2)
{
   float r1((*p1)->peer_recv_rate.Get());
   float r2((*p2)->peer_recv_rate.Get());
   int c=cmp(r1,r2);
   if(c) return c;
   return PeersCompareSendRate(p1,p2);
}
int Torrent::PeersCompareSendRate(const SMTaskRef<TorrentPeer> *p1,const SMTaskRef<TorrentPeer> *p2)
{
   float r1((*p1)->peer_send_rate.Get());
   float r2((*p2)->peer_send_rate.Get());
   return cmp(r1,r2);
}

bool Torrent::SeededEnough() const
{
   return (stop_on_ratio>0 && GetRatio()>=stop_on_ratio
	   && GetMinPerPieceRatio()>=stop_min_ppr)
      || seed_timer.Stopped();
}

void Torrent::CleanPeers()
{
   Enter();
   // remove uninteresting peers and request more
   for(int i=0; i<peers.count(); i++) {
      const TorrentPeer *peer=peers[i];
      if(peer->ActivityTimedOut()) {
	 LogNote(4,"removing uninteresting peer %s (%s)",peer->GetName(),peers[i]->Status());
	 BlackListPeer(peer,"2h");
	 peers.remove(i--);
      }
   }
   Leave();
}

void Torrent::StartTrackers()
{
   for(int i=0; i<trackers.count(); i++) {
      trackers[i]->Start();
   }
}

int Torrent::GetPort()
{
   int port=0;
   if(listener && !port)
      port=listener->GetPort();
#if INET6
   if(listener_ipv6 && !port)
      port=listener_ipv6->GetPort();
#endif
   if(listener_udp && !port)
      return listener_udp->GetPort();
#if INET6
   if(listener_ipv6_udp && !port)
      port=listener_ipv6_udp->GetPort();
#endif
   return port;
}

void Torrent::AnnounceDHT()
{
   if(is_private)
      return;
   CleanPeers();
   if(dht)
      dht->AnnouncePeer(this);
#if INET6
   if(dht_ipv6)
      dht_ipv6->AnnouncePeer(this);
#endif
   dht_announce_timer.Reset();
}
void Torrent::DenounceDHT()
{
   if(is_private)
      return;
   if(dht)
      dht->DenouncePeer(this);
#if INET6
   if(dht_ipv6)
      dht_ipv6->DenouncePeer(this);
#endif
}
void Torrent::DHT_Announced(int af)
{
   if(af==AF_INET)
      dht_announce_count++;
#if INET6
   else if(af==AF_INET6)
      dht_announce_count_ipv6++;
#endif
}
const char *Torrent::DHT_Status() const
{
   if(!HasDHT() || Private())
      return "";
   static xstring status;
   status.nset("",0);
   if(dht_announce_count || dht_announce_count_ipv6) {
      status.append(_("announced via "));
      if(dht_announce_count)
	 status.appendf("ipv4:%d",dht_announce_count);
#if INET6
      if(dht_announce_count_ipv6) {
	 if(dht_announce_count)
	    status.append(", ");
	 status.appendf("ipv6:%d",dht_announce_count_ipv6);
      }
#endif
   }
   if(!dht_announce_timer.Stopped() && !validating) {
      if(status.length()>0)
	 status.append("; ");
      status.appendf(_("next announce in %s"),
	 dht_announce_timer.TimeLeft().toString(TimeInterval::TO_STR_TRANSLATE));
   }
   return status;
}

const char *Torrent::GetMetadataPath() const
{
   if(!QueryBool("torrent:save-metadata",0))
      return NULL;
   xstring& path=xstring::cat(get_lftp_data_dir(),"/torrent",NULL);
   mkdir(path,0700);
   path.append("/md");
   mkdir(path,0700);
   path.append('/');
   info_hash.hexdump_to(path);
   return path;
}

bool Torrent::SaveMetadata() const
{
   if(md_saved)
      return true;  // saved already.

   const char *path=GetMetadataPath();
   if(!path)
      return false;

   int fd=open(path,O_CREAT|O_WRONLY,0600);
   if(fd<0) {
      LogError(9,"open(%s): %s",path,strerror(errno));
      return false;
   }

   int bytes_to_write=metadata.length();
   int res=write(fd,metadata.get(),bytes_to_write);
   int saved_errno=errno;
   ftruncate(fd,bytes_to_write);

   close(fd);

   if(res==bytes_to_write)
      return true;  // no error, fd is closed.

   if(res<0)
      LogError(9,"write(%s): %s",path,strerror(saved_errno));
   else
      LogError(9,"write(%s): short write (only wrote %d bytes)",path,res);

   return false;
}
bool Torrent::LoadMetadata(const char *path)
{
   int fd=open(path,O_RDONLY);
   if(fd<0) {
      LogError(9,"open(%s): %s",path,strerror(errno));
      return false;
   }

   struct stat st;
   if(fstat(fd,&st)==-1) {
      close(fd);
      return false;
   }
   int bytes_to_read=st.st_size;

   xstring md;
   int res=read(fd,md.add_space(bytes_to_read),bytes_to_read);
   int saved_errno=errno;

   close(fd);

   if(res==bytes_to_read) {
      // no error, fd is closed.
      md.add_commit(res);

      xstring new_info_hash;
      SHA1(md,new_info_hash);
      if(info_hash && info_hash.ne(new_info_hash)) {
	 LogError(9,"cached metadata does not match info_hash");
	 return false;
      }
      LogNote(9,"got metadata from %s",path);
      if(SetMetadata(md)) {
	 md_saved=true;
	 return true;
      }
      return false;
   }

   if(res<0)
      LogError(9,"read(%s): %s",path,strerror(saved_errno));
   else
      LogError(9,"read(%s): short read (only read %d bytes)",path,res);
   return false;
}

void Torrent::FetchMetadataFromURL(const char *url)
{
   ParsedURL u(url,true);
   if(!u.proto) {
      u.proto.set("file");
      u.path.set(url);  // undo %xx translation
   }
   LogNote(9,"Retrieving meta-data from `%s'...\n",url);
   FileCopyPeer *metainfo_src=new FileCopyPeerFA(&u,FA::RETRIEVE);
   FileCopyPeer *metainfo_dst=new FileCopyPeerMemory(10000000);
   metainfo_copy=new FileCopy(metainfo_src,metainfo_dst,false);
}

void Torrent::StartMetadataDownload()
{
   const char *path=GetMetadataPath();
   if(path && access(path,R_OK)>=0) {
      // we have the metadata cached
      if(LoadMetadata(path)) {
	 if(stop_if_known) {
	    LogNote(2,"found cached metadata, stopping");
	    Shutdown();
	 } else {
	    Startup();
	 }
	 return;
      }
   }
   md_download.nset("",0);
   // add torrent without metadata to announce it and get peers to get MD from.
   AddTorrent(this);
}

void Torrent::SetTotalLength(off_t tl)
{
   total_length=tl;

   LogNote(4,"Total length is %llu",total_length);
   total_left=total_length;

   last_piece_length=total_length%piece_length;
   if(last_piece_length==0)
      last_piece_length=piece_length;

   total_pieces=(total_length+piece_length-1)/piece_length;

   my_bitfield=new BitField(total_pieces);

   blocks_in_piece=(piece_length+BLOCK_SIZE-1)/BLOCK_SIZE;
   blocks_in_last_piece=(last_piece_length+BLOCK_SIZE-1)/BLOCK_SIZE;

   piece_info=new TorrentPiece[total_pieces]();
}

void Torrent::StartValidating()
{
   validate_index=0;
   validating=true;
   recv_rate.Reset();
}

bool Torrent::SetMetadata(const xstring& md)
{
   metadata.set(md);
   timeout_timer.Reset();

   xstring new_info_hash;
   SHA1(metadata,new_info_hash);
   if(info_hash && info_hash.ne(new_info_hash)) {
      metadata.unset();
      SetError("metadata does not match info_hash");
      return false;
   }
   info_hash.set(new_info_hash);

   if(!info) {
      int rest;
      info=BeNode::Parse(metadata,metadata.length(),&rest);
      if(!info) {
	 SetError("cannot parse metadata");
	 return false;
      }
      xmap_p<BeNode> d;
      d.add("info",info);
      metainfo_tree=new BeNode(&d);
      InitTranslation();
   }

   BeNode *b_piece_length=Lookup(info,"piece length",BeNode::BE_INT);
   if(!b_piece_length || b_piece_length->num<1024 || b_piece_length->num>INT_MAX/4) {
      SetError("Meta-data: invalid piece length");
      return false;
   }
   piece_length=b_piece_length->num;
   LogNote(4,"Piece length is %u",piece_length);

   BeNode *b_name=info->lookup("name",BeNode::BE_STR);
   BeNode *b_name_utf8=info->lookup("name.utf-8",BeNode::BE_STR);
   if(b_name_utf8) {
      TranslateStringFromUTF8(b_name_utf8);
      name.set(b_name_utf8->str_lc);
   } else if(b_name) {
      TranslateString(b_name);
      name.set(b_name->str_lc);
   } else {
      name.truncate();
      info_hash.hexdump_to(name);
   }
   Reconfig(0);

   BeNode *files=info->lookup("files");
   if(!files) {
      BeNode *length=Lookup(info,"length",BeNode::BE_INT);
      if(!length || length->num<0) {
	 SetError("Meta-data: invalid or missing length");
	 return false;
      }
      total_length=length->num;
   } else {
      if(files->type!=BeNode::BE_LIST) {
	 SetError("Meta-data: wrong `info/files' type, must be LIST");
	 return false;
      }
      total_length=0;
      for(int i=0; i<files->list.length(); i++) {
	 if(files->list[i]->type!=BeNode::BE_DICT) {
	    SetError(xstring::format("Meta-data: wrong `info/files[%d]' type, must be LIST",i));
	    return false;
	 }
	 BeNode *f=Lookup(files->list[i]->dict,"length",BeNode::BE_INT);
	 if(!f || f->num<0) {
	    SetError("Meta-data: invalid or missing file length");
	    return false;
	 }
	 if(!Lookup(files->list[i]->dict,"path",BeNode::BE_LIST)) {
	    SetError("Meta-data: file path missing");
	    return false;
	 }
	 total_length+=f->num;
      }
   }
   this->files=new TorrentFiles(files,this);
   SetTotalLength(total_length);

   BeNode *b_pieces=Lookup(info,"pieces",BeNode::BE_STR);
   if(!b_pieces) {
      SetError("Meta-data: `pieces' missing");
      return false;
   }
   pieces=&b_pieces->str;
   if(pieces->length()!=SHA1_DIGEST_SIZE*total_pieces) {
      SetError("Meta-data: invalid `pieces' length");
      return false;
   }

   is_private=info->lookup_int("private");

   return true;
}

void Torrent::Startup()
{
   if(!info_hash || !metadata)
      SetError("missing metadata");
   if(shutting_down)
      return;
   Torrent *other=FindTorrent(info_hash);
   if(other) {
      if(other!=this) {
	 SetError("This torrent is already running");
	 return;
      }
   } else {
      AddTorrent(this);
   }

   if(!building)
      md_saved=SaveMetadata();

   if(!force_valid && !building) {
      StartValidating();
   } else {
      my_bitfield->set_range(0,total_pieces,1);
      complete_pieces=total_pieces;
      total_left=0;
      complete=true;
      seed_timer.Reset();
      dht_announce_timer.Stop();
   }
   RestartPeers();
}

void Torrent::RestartPeers()
{
   for(int i=0; i<peers.count(); i++)
      peers[i]->Restart();
}

static int base32_char_to_value(char c)
{
   if(c>='a' && c<='z')
      return c-'a';
   if(c>='A' && c<='Z')
      return c-'A';
   if(c>='2' && c<='7')
      return c-'2'+26;
   if(c=='=')
      return 0;
   return -1;
}
void base32_decode(const char *base32,xstring& out)
{
   unsigned data=0;
   int data_bits=0;
   int pad_bits=0;
   while(*base32) {
      char c=*base32++;
      if(c=='=' && data_bits<=pad_bits)
	 return;
      if(pad_bits>0 && c!='=')
	 return;
      int v=base32_char_to_value(c);
      if(v==-1)
	 return;
      data|=((v&31)<<(11-data_bits));
      data_bits+=5;
      if(c=='=')
	 pad_bits+=5;
      if(data_bits>=8) {
	 out.append(char((data>>8)&255));
	 data<<=8;
	 data_bits-=8;
      }
   }
   if(data_bits>0)
      out.append(char((data>>8)&255));
}

void Torrent::ParseMagnet(const char *m0)
{
   char *m=alloca_strdup(m0);
   for(char *p=strtok(m,"&"); p; p=strtok(NULL,"&")) {
      char *v=strchr(p,'=');
      if(!v)
	 continue;
      *v++=0;
      v=xstring::get_tmp(v).url_decode(URL_DECODE_PLUS).get_non_const();
      if(!strcmp(p,"xt")) {
	 if(strncmp(v,"urn:btih:",9)) {
	    SetError("Only BitTorrent magnet links are supported");
	    return;
	 }
	 v+=9;
	 xstring& btih=xstring::get_tmp(v);
	 if(btih.length()==40) {
	    btih.hex_decode();
	    if(btih.length()!=20) {
	       SetError("Invalid value of urn:btih in magnet link");
	       return;
	    }
	    info_hash.move_here(btih);
	 } else {
	    info_hash.truncate();
	    base32_decode(v,info_hash);
	    if(info_hash.length()!=20) {
	       info_hash.unset();
	       SetError("Invalid value of urn:btih in magnet link");
	       return;
	    }
	 }
      }
      else if(!strcmp(p,"tr")) {
	 SMTaskRef<TorrentTracker> new_tracker(new TorrentTracker(this,v));
	 if(!new_tracker->Failed()) {
	    new_tracker->tracker_no=trackers.count();
	    trackers.append(new_tracker.borrow());
	 }
      }
      else if(!strcmp(p,"dn")) {
	 name.set(v);
      }
   }
   if(!info_hash) {
      SetError("missing urn:btih in magnet link");
      return;
   }
   if(FindTorrent(info_hash)) {
      SetError("This torrent is already running");
      return;
   }
   StartMetadataDownload();
}

void Torrent::CalcPiecesStats()
{
   min_piece_sources=INT_MAX;
   avg_piece_sources=0;
   pieces_available_pct=0;
   for(unsigned i=0; i<total_pieces; i++) {
      if(my_bitfield->get_bit(i))
	 continue;
      unsigned sc=piece_info[i].get_sources_count();
      if(min_piece_sources>sc)
	 min_piece_sources=sc;
      if(sc==0)
	 continue;
      pieces_available_pct++;
      avg_piece_sources+=sc;
   }
   avg_piece_sources=avg_piece_sources*256/(total_pieces-complete_pieces);
   pieces_available_pct=pieces_available_pct*100/(total_pieces-complete_pieces);
   CalcPerPieceRatio();
}

void Torrent::CalcPerPieceRatio()
{
   current_min_ppr=1024;
   current_max_ppr=0;
   for(unsigned i=0; i<total_pieces; i++) {
      float ppr=piece_info[i].get_ratio();
      if(current_min_ppr>ppr)
	 current_min_ppr=ppr;
      if(current_max_ppr<ppr)
	 current_max_ppr=ppr;
   }
}

void Torrent::RebuildPiecesNeeded()
{
   pieces_needed.truncate();
   bool enter_end_game=true;
   for(unsigned i=0; i<total_pieces; i++) {
      if(!my_bitfield->get_bit(i)) {
	 if(!piece_info[i].has_a_downloader())
	    enter_end_game=false;
	 if(piece_info[i].has_no_sources())
	    continue;
	 pieces_needed.append(i);
      }
      piece_info[i].cleanup();
   }
   if(!end_game && enter_end_game) {
      LogNote(1,"entering End Game mode");
      end_game=true;
   }
   cmp_torrent=this;
   pieces_needed.qsort(PiecesNeededCmp);
   CalcPiecesStats();
   pieces_timer.Reset();
}

TorrentBuild::TorrentBuild(const char *path) :
   top_path(path),
   name(basename_ptr(path)),
   done(false),
   total_length(0),
   piece_length(0)
{
   name.rtrim('/');

   struct stat st;
   if(stat(path,&st)==-1) {
      error=SysError();
      return;
   }
   if(S_ISREG(st.st_mode)) {
      total_length=st.st_size;
      LogNote(10,"single file %s, size %lld",path,(long long)st.st_size);
      Finish();
      return;
   }
   if(!S_ISDIR(st.st_mode)) {
      error=new Error(-1,"Need a plain file or directory",true);
      return;
   }
   QueueDir("");
}
const char *TorrentBuild::lc_to_utf8(const char *s)
{
   if(!translate || !s)
      return s;

   translate->ResetTranslation();
   translate->PutTranslated(s);
   int len;
   translate->Get(&s,&len);
   translate->Skip(len);
   return xstring::get_tmp(s,len);
}
void TorrentBuild::Finish()
{
   done=true;
   LogNote(10,"scan finished, total_length=%lld",(long long)total_length);

   translate=new DirectedBuffer(DirectedBuffer::PUT);
   translate->SetTranslation("UTF-8",false);

   xmap_p<BeNode> *b_info=new xmap_p<BeNode>();
   b_info->add("name",new BeNode(lc_to_utf8(name)));

   piece_length=16*1024;
   off_t length_scan=piece_length*2200;
   while(length_scan<=total_length) {
      piece_length*=2;
      length_scan*=2;
   }
   b_info->add("piece length",new BeNode(piece_length));

   if(files.count()==0) {
      b_info->add("length",new BeNode(total_length));
   } else {
      files.Sort(FileSet::BYNAME);
      files.rewind();
      xarray_p<BeNode> *b_files=new xarray_p<BeNode>();
      for(FileInfo *fi=files.curr(); fi; fi=files.next()) {
	 xarray_p<BeNode> *path=new xarray_p<BeNode>();
	 const char *name_utf8=lc_to_utf8(fi->name);
	 char *p=alloca_strdup(name_utf8);
	 for(p=strtok(p,"/"); p; p=strtok(NULL,"/"))
	    path->append(new BeNode(p));
	 xmap_p<BeNode> *b_file=new xmap_p<BeNode>();
	 b_file->add("path",new BeNode(path));
	 b_file->add("length",new BeNode(fi->size));
	 b_files->append(new BeNode(b_file));
      }
      b_info->add("files",new BeNode(b_files));
   }
   info=new BeNode(b_info);
}
void TorrentBuild::SetPiece(unsigned p,const xstring& sha)
{
   assert(pieces.length()==p*20); // require sequential building
   pieces.append(sha);
}
const xstring& TorrentBuild::GetMetadata()
{
   info->dict.add("pieces",new BeNode(pieces));
   return info->Pack();
}
const xstring& TorrentBuild::Status() const
{
   if(Done())
      return xstring::get_tmp("");
   const char *curr=CurrPath();
   int count=files.count();
   if(curr[0])
      return xstring::format(plural("%d file$|s$ found, now scanning %s",count),count,curr);
   return xstring::format(plural("%d file$|s$ found",count),count);
}
int TorrentBuild::Do()
{
   int m=STALL;
   if(Done())
      return m;

   const char *curr_path=CurrPath();
   if(!curr_path) {
      Finish();
      return MOVED;
   }
   const char *full_path=dir_file(top_path,curr_path);
   full_path=alloca_strdup(full_path);

   DIR *dir=opendir(full_path);
   if(!dir) {
      if(NonFatalError(errno))
	 return m;
      if(dirs_to_scan.Count()<=1)
	 error=SysError();
      else
	 LogError(0,"opendir(%s): %s",full_path,strerror(errno));
      NextDir();
      return MOVED;
   }
   LogNote(10,"scanning %s",full_path);
   struct dirent *entry;
   while((entry=readdir(dir))!=0) {
      if(!strcmp(entry->d_name,".") || !strcmp(entry->d_name,".."))
	 continue;
      struct stat st;
      const char *path=dir_file(full_path,entry->d_name);
      if(lstat(path,&st)==-1) {
	 LogError(0,"stat(%s): %s",path,strerror(errno));
	 continue;
      }
      if(S_ISREG(st.st_mode))
	 AddFile(dir_file(curr_path,entry->d_name),&st);
      else if(S_ISDIR(st.st_mode))
	 QueueDir(dir_file(curr_path,entry->d_name));
      else
	 LogNote(10,"ignoring %s (not a directory nor a plain file)",path);
   }
   closedir(dir);
   NextDir();
   return MOVED;
}
void TorrentBuild::AddFile(const char *path,struct stat *st)
{
   FileInfo *fi=new FileInfo(path);
   fi->SetSize(st->st_size);
   files.Add(fi);
   total_length+=st->st_size;
   LogNote(10,"adding %s, size %lld",path,(long long)fi->size);
}

int Torrent::Do()
{
   int m=STALL;
   if(Done() || shutting_down)
      return m;
   if(!complete && !building && timeout_timer.Stopped()) {
      SetError("timed out with no progress");
      return MOVED;
   }
   if(build_md && !files) {
      if(!building)
	 building=new TorrentBuild(metainfo_url);
      if(!building->Done())
	 return m;
      m=MOVED;
      if(building->Failed()) {
	 SetError(building->ErrorText());
	 return m;
      }
      InitTranslation();
      name.set(building->name);
      piece_length=building->piece_length;
      SetTotalLength(building->total_length);
      files=new TorrentFiles(building->GetFilesNode(),this);
      output_dir.set(building->GetBaseDirectory());
      StartValidating();
   }
   if(!metainfo_tree && metainfo_url && !md_download && !build_md) {
      // retrieve metainfo if don't have already.
      if(!metainfo_copy) {
	 if(metainfo_url.begins_with("magnet:?")) {
	    ParseMagnet(metainfo_url+8);
	    return MOVED;
	 }
	 if(metainfo_url.length()==40 && strspn(metainfo_url,"0123456789ABCDEFabcdef")==40
	 && access(metainfo_url,0)==-1) {
	    xstring& btih=xstring::get_tmp(metainfo_url);
	    btih.hex_decode();
	    assert(btih.length()==20);
	    info_hash.move_here(btih);
	    if(FindTorrent(info_hash)) {
	       SetError("This torrent is already running");
	       return MOVED;
	    }
	    StartMetadataDownload();
	    return MOVED;
	 }
	 FetchMetadataFromURL(metainfo_url);
	 m=MOVED;
      }
      if(metainfo_copy->Error()) {
	 SetError(Error::Fatal(metainfo_copy->ErrorText()));
	 metainfo_copy=0;
	 return MOVED;
      }
      if(!metainfo_copy->Done())
	 return m;
      LogNote(9,"meta-data EOF\n");
      int rest;
      const char *metainfo_buf;
      int metainfo_len;
      metainfo_copy->put->Get(&metainfo_buf,&metainfo_len);
      metainfo_tree=BeNode::Parse(metainfo_buf,metainfo_len,&rest);
      metainfo_copy=0;
      if(!metainfo_tree) {
	 SetError("Meta-data parse error");
	 return MOVED;
      }
      if(rest>0) {
	 SetError("Junk at the end of Meta-data");
	 return MOVED;
      }

      InitTranslation();

      LogNote(10,"Received meta-data:");
      Log::global->Write(10,metainfo_tree->Format());

      if(metainfo_tree->type!=BeNode::BE_DICT) {
	 SetError("Meta-data: wrong top level type, must be DICT");
         return MOVED;
      }

      const char *retracker=ResMgr::Query("torrent:retracker",GetName());
      int retracker_len=xstrlen(retracker);
      BeNode *announce_list=metainfo_tree->lookup("announce-list",BeNode::BE_LIST);
      if(announce_list) {
	 for(int i=0; i<announce_list->list.length(); i++) {
	    BeNode *announce_list1=announce_list->list[i];
	    if(announce_list1->type!=BeNode::BE_LIST)
	       continue;
	    SMTaskRef<TorrentTracker> new_tracker;
	    for(int j=0; j<announce_list1->list.length(); j++) {
	       BeNode *announce=announce_list1->list[j];
	       if(announce->type!=BeNode::BE_STR)
		  continue;
	       if(retracker_len && !strncmp(retracker,announce->str,retracker_len))
		  retracker=0, retracker_len=0;
	       if(!new_tracker)
		  new_tracker=new TorrentTracker(this,announce->str);
	       else
		  new_tracker->AddURL(announce->str);
	    }
	    if(new_tracker && !new_tracker->Failed()) {
	       new_tracker->tracker_no=trackers.count();
	       trackers.append(new_tracker.borrow());
	    }
	 }
      }

      if(trackers.count()==0) {
	 const xstring& announce=metainfo_tree->lookup_str("announce");
	 if(announce) {
	    SMTaskRef<TorrentTracker> new_tracker(
	       new TorrentTracker(this,announce));
	    if(!new_tracker->Failed())
	       trackers.append(new_tracker.borrow());
	 }
      }

      if(retracker_len) {
	 SMTaskRef<TorrentTracker> new_tracker(new TorrentTracker(this,retracker));
	 if(!new_tracker->Failed()) {
	    new_tracker->tracker_no=trackers.count();
            trackers.append(new_tracker.borrow());
	 }
      }

      BeNode *nodes=metainfo_tree->lookup("nodes",BeNode::BE_LIST);
      if(nodes && dht) {
	 for(int i=0; i<nodes->list.count(); i++) {
	    BeNode *n=nodes->list[i];
	    if(n->type!=BeNode::BE_LIST || n->list.count()<2)
	       continue;
	    BeNode *b_host=n->list[0];
	    BeNode *b_port=n->list[1];
	    if(b_host->type!=BeNode::BE_STR || b_port->type!=BeNode::BE_INT)
	       continue;
	    if(b_port->num<=0 || b_port->num>=65535)
	       continue;
	    ParsedURL u;
	    u.host.set(b_host->str);
	    u.port.set(xstring::format("%u",(unsigned)b_port->num));
	    xstring_ca node(u.Combine());
	    dht->AddBootstrapNode(node);
#if INET6
	    dht_ipv6->AddBootstrapNode(node);
#endif
	 }
      }

      info=Lookup(metainfo_tree,"info",BeNode::BE_DICT);
      if(!info)
         return MOVED;

      if(SetMetadata(info->str))
	 Startup();
      m=MOVED;
      if(Done())
	 return m;
   }
   if(peers_scan_timer.Stopped())
      ScanPeers();
   if(validating) {
      ValidatePiece(validate_index++);
      if(validate_index<total_pieces) {
	 recv_rate.Add(piece_length);
	 return MOVED;
      }
      recv_rate.Add(last_piece_length);
      validating=false;
      recv_rate.Reset();
      if(total_left==0) {
	 complete=true;
	 seed_timer.Reset();
	 if(stop_if_complete) {
	    LogNote(2,"torrent is already complete, stopping");
	    Shutdown();
	    return MOVED;
	 }
      }
      if(building) {
	 if(!complete) {
	    SetError("File validation error");
	    return MOVED;
	 }
	 if(!SetMetadata(building->GetMetadata()))
	    return MOVED;
	 building=0;
	 xstring magnet("magnet:?xt=urn:btih:");
	 magnet.append(info_hash.hexdump());
	 magnet.appendf("&xl=%lld",(long long)total_length);
	 magnet.append("&dn=");
	 magnet.append_url_encoded(name,URL_PATH_UNSAFE);
	 printf("%s\n",magnet.get());
	 Startup();
      }
      RestartPeers();
      dht_announce_timer.Stop();
   }
   if(GetPort())
      StartTrackers();
   if(dht_announce_timer.Stopped())
      AnnounceDHT();

   // count peers
   connected_peers_count=0;
   active_peers_count=0;
   complete_peers_count=0;
   for(int i=0; i<peers.count(); i++) {
      connected_peers_count+=peers[i]->Connected();
      active_peers_count+=peers[i]->Active();
      complete_peers_count+=peers[i]->Complete();
   }

   if(!metadata)
      return m;

   if(optimistic_unchoke_timer.Stopped())
      OptimisticUnchoke();

   // rebuild lists of needed pieces
   if(!complete && (pieces_needed.count()==0 || pieces_timer.Stopped()))
      RebuildPiecesNeeded();

   if(complete) {
      if(pieces_timer.Stopped()) {
	 CalcPerPieceRatio();
	 pieces_timer.Reset();
      }
      if(SeededEnough()) {
	 Shutdown();
	 return MOVED;
      }
   }

   return m;
}

void Torrent::BlackListPeer(const TorrentPeer *peer,const char *timeout)
{
   if(peer->IsPassive() || GetTorrentsCount()==0)
      return;
   if(!black_list)
      black_list=new TorrentBlackList();
   black_list->Add(peer->GetAddress(),timeout);
}
bool Torrent::BlackListed(const TorrentPeer *peer)
{
   return black_list && black_list->Listed(peer->GetAddress());
}

bool Torrent::CanAccept() const
{
   return !validating && decline_timer.Stopped();
}

void Torrent::Accept(int s,const sockaddr_u *addr,IOBuffer *rb)
{
   if(!CanAccept()) {
      LogNote(4,"declining new connection");
      Delete(rb);
      close(s);
      return;
   }
   TorrentPeer *p=new TorrentPeer(this,addr,TorrentPeer::TR_ACCEPTED);
   p->Connect(s,rb);
   AddPeer(p);
}

void Torrent::AddPeer(TorrentPeer *peer)
{
   if(BlackListed(peer)) {
      Delete(peer);
      return;
   }
   for(int i=0; i<peers.count(); i++) {
      if(peers[i]->AddressEq(peer)) {
	 if(peer->Connected() && !peers[i]->Connected()) {
	    peers[i]=peer;
	 } else {
	    Delete(peer);
	 }
	 return;
      }
   }
   peers.append(peer);
}

int Torrent::GetWantedPeersCount() const
{
   int numwant=complete?seed_min_peers:max_peers/2;
   if(numwant>peers.count())
      numwant-=peers.count();
   else
      numwant=0;
   if(shutting_down)
      numwant=-1;
   if(numwant>1) {
      // count almost ready for request trackers
      int trackers_count=0;
      for(int i=0; i<trackers.count(); i++)
	 trackers_count+=(trackers[i]->tracker_timer.TimeLeft()<60);
      // request peers from all trackers evenly (round up).
      if(trackers_count)
	 numwant=(numwant+trackers_count-1)/trackers_count;
   }
   return numwant;
}

const char *Torrent::MakePath(BeNode *p) const
{
   BeNode *path=p->lookup("path.utf-8",BeNode::BE_LIST);
   void (Torrent::*tr)(BeNode*)const=&Torrent::TranslateStringFromUTF8;
   if(!path) {
      path=p->lookup("path",BeNode::BE_LIST);
      tr=&Torrent::TranslateString;
   }
   static xstring buf;
   buf.set(name);
   if(buf.eq("..") || buf[0]=='/') {
      buf.set_substr(0,0,"_",1);
   }
   for(int i=0; i<path->list.count(); i++) {
      BeNode *e=path->list[i];
      if(e->type==BeNode::BE_STR) {
	 (this->*tr)(e);
	 buf.append('/');
	 if(e->str_lc.eq(".."))
	    buf.append('_');
	 buf.append(e->str_lc);
      }
   }
   return buf;
}
const char *Torrent::FindFileByPosition(unsigned piece,unsigned begin,off_t *f_pos,off_t *f_tail) const
{
   off_t target_pos=(off_t)piece*piece_length+begin;
   TorrentFile *file=files->FindByPosition(target_pos);
   if(!file)
      return 0;

   *f_pos=target_pos-file->pos;
   *f_tail=file->length-*f_pos;

   return file->path;
}

TorrentFiles::TorrentFiles(const BeNode *files,const Torrent *t)
{
   if(!files) {
      grow_space(1);
      set_length(1);
      file(0)->set(t->GetName(),0,t->TotalLength());
   } else {
      int count=files->list.length();
      grow_space(count);
      set_length(count);
      off_t scan_pos=0;
      for(int i=0; i<count; i++) {
	 BeNode *node=files->list[i];
	 off_t file_length=node->lookup_int("length");
	 file(i)->set(t->MakePath(node),scan_pos,file_length);
	 scan_pos+=file_length;
      }
   }
   qsort(pos_cmp);
}
TorrentFile *TorrentFiles::FindByPosition(off_t pos)
{
   int i=0;
   int j=length()-1;
   while(i<=j) {
      // invariant: the target element is in the range [i,j]
      int m=(i+j)/2;
      if(file(m)->contains_pos(pos))
	 return file(m);
      if(file(m)->pos>pos)
	 j=m-1;
      else
	 i=m+1;
   }
   return 0;
}

FDCache::FDCache()
   : clean_timer(1)
{
   max_count=16;
   max_time=30;
}
FDCache::~FDCache()
{
   CloseAll();
}
void FDCache::Clean()
{
   for(int i=0; i<3; i++) {
      xmap<FD>& cache=this->cache[i];
      for(const FD *f=&cache.each_begin(); f->last_used; f=&cache.each_next()) {
	 if(f->fd==-1) {
	    if(f->last_used+1<now.UnixTime())
	       cache.remove(cache.each_key());
	    continue;
	 }
	 if(f->last_used+max_time<now.UnixTime()) {
	    ProtoLog::LogNote(9,"closing %s",cache.each_key().get());
	    close(f->fd);
	    cache.remove(cache.each_key());
	 }
      }
   }
   while(Count()>max_count && CloseOne())
      /*empty*/;
   if(Count()>0)
      clean_timer.Reset();
}
int FDCache::Do()
{
   if(clean_timer.Stopped())
      Clean();
   return STALL;
}
void FDCache::Close(const char *name)
{
   const xstring &n=xstring::get_tmp(name);
   for(int i=0; i<3; i++) {
      const FD& f=cache[i].lookup(n);
      if(f.last_used!=0) {
	 if(f.fd!=-1) {
	    ProtoLog::LogNote(9,"closing %s",name);
#ifdef HAVE_POSIX_FADVISE
	    if(i==O_RDONLY) // avoid filling up the cache
	       posix_fadvise(f.fd,0,0,POSIX_FADV_DONTNEED);
#endif
	    close(f.fd);
	 }
	 cache[i].remove(n);
      }
   }
}
void FDCache::CloseAll()
{
   for(int i=0; i<3; i++) {
      xmap<FD>& cache=this->cache[i];
      for(const FD *f=&cache.each_begin(); f->last_used; f=&cache.each_next()) {
	 if(f->fd!=-1) {
	    ProtoLog::LogNote(9,"closing %s",cache.each_key().get());
	    close(f->fd);
	 }
	 cache.remove(cache.each_key());
      }
   }
}
bool FDCache::CloseOne()
{
   int oldest_mode=0;
   int oldest_fd=-1;
   int oldest_time=0;
   const xstring *oldest_key=0;
   for(int i=0; i<3; i++) {
      xmap<FD>& cache=this->cache[i];
      for(const FD *f=&cache.each_begin(); f->last_used; f=&cache.each_next()) {
	 if(f->fd==-1)
	    continue;
	 if(oldest_key==0 || f->last_used<oldest_time) {
	    oldest_key=&cache.each_key();
	    oldest_time=f->last_used;
	    oldest_fd=f->fd;
	    oldest_mode=i;
	 }
      }
   }
   if(!oldest_key)
      return false;
   if(oldest_fd!=-1) {
      ProtoLog::LogNote(9,"closing %s",oldest_key->get());
      close(oldest_fd);
   }
   cache[oldest_mode].remove(*oldest_key);
   return true;
}
int FDCache::Count() const
{
   return cache[0].count()+cache[1].count()+cache[2].count();
}
int FDCache::OpenFile(const char *file,int m,off_t size)
{
   int ci=m&3;
   assert(ci<3);
   FD& f=cache[ci].lookup_Lv(file);
   if(f.last_used!=0) {
      if(f.fd!=-1)
	 f.last_used=now.UnixTime();
      else
	 errno=f.saved_errno;
      return f.fd;
   }
   if(ci==O_RDONLY) {
      // O_RDWR also will do, check if we have it.
      const FD& f_rw=cache[O_RDWR].lookup(file);
      if(f_rw.last_used!=0 && f_rw.fd!=-1) {
	 // don't update last_used to expire it and reopen with proper mode
	 return f_rw.fd;
      }
   }
   Clean();
   clean_timer.Reset();
   ProtoLog::LogNote(9,"opening %s",file);
   int fd;
   do {
      fd=open(file,m,0664);
   } while(fd==-1 && (errno==EMFILE || errno==ENFILE) && CloseOne());
   FD new_entry = {fd,errno,now.UnixTime()};
   cache[ci].add(file,new_entry);
   if(fd!=-1)
      fcntl(fd,F_SETFD,FD_CLOEXEC);
   if(fd==-1 || size==0)
      return fd;
   if(ci==O_RDWR && QueryBool("file:use-fallocate",0)) {
      struct stat st;
      // check if it is newly created file, then allocate space
      if(fstat(fd,&st)!=-1 && st.st_size==0) {
	 if(lftp_fallocate(fd,size)==-1 && errno!=ENOSYS && errno!=EOPNOTSUPP) {
	    ProtoLog::LogError(9,"space allocation for %s (%lld bytes) failed: %s",
	       file,(long long)size,strerror(errno));
	 }
      }
   }
#ifdef HAVE_POSIX_FADVISE
   if(ci==O_RDONLY) {
      // validation mode (when validating, size>0)
      posix_fadvise(fd,0,size,POSIX_FADV_SEQUENTIAL);
      posix_fadvise(fd,0,size,POSIX_FADV_NOREUSE);
   }
#endif//HAVE_POSIX_FADVISE
   return fd;
}

int Torrent::OpenFile(const char *file,int m,off_t size)
{
   bool did_mkdir=false;
   if(!fd_cache)
      fd_cache=new FDCache();
try_again:
   const char *cf=dir_file(output_dir,file);
   int fd=fd_cache->OpenFile(cf,m,size);
   while(fd==-1 && (errno==EMFILE || errno==ENFILE) && peers.count()>0) {
      peers.chop();  // free an fd
      fd=fd_cache->OpenFile(cf,m,size);
   }
   if(validating)
      return fd;
   if(fd==-1)
      fd_cache->Close(cf); // remove negative cache.
   if(fd==-1 && errno==ENOENT && !did_mkdir) {
      LogError(10,"open(%s): %s",cf,strerror(errno));
      const char *sl=strchr(file,'/');
      while(sl)
      {
	 if(sl>file) {
	    if(mkdir(cf=dir_file(output_dir,xstring::get_tmp(file,sl-file)),0775)==-1 && errno!=EEXIST)
	       LogError(9,"mkdir(%s): %s",cf,strerror(errno));
	 }
	 sl=strchr(sl+1,'/');
      }
      did_mkdir=true;
      goto try_again;
   }
   return fd;
}
void Torrent::CloseFile(const char *file) const
{
   if(!fd_cache)
      return;
   fd_cache->Close(dir_file(output_dir,file));
}

void Torrent::SetPieceNotWanted(unsigned piece)
{
   for(int j=0; j<pieces_needed.count(); j++) {
      if(pieces_needed[j]==piece) {
	 pieces_needed.remove(j);
	 break;
      }
   }
}

#define MIN(a,b) ((a)<(b)?(a):(b))

void Torrent::StoreBlock(unsigned piece,unsigned begin,unsigned len,const char *buf,TorrentPeer *src_peer)
{
   for(int i=0; i<peers.count(); i++)
      peers[i]->CancelBlock(piece,begin);

   unsigned b=begin/BLOCK_SIZE;
   int bc=(len+BLOCK_SIZE-1)/BLOCK_SIZE;

   off_t f_pos=0;
   off_t f_rest=len;
   while(len>0) {
      const char *file=FindFileByPosition(piece,begin,&f_pos,&f_rest);
      int fd=OpenFile(file,O_RDWR|O_CREAT,f_pos+f_rest);
      if(fd==-1) {
	 SetError(xstring::format("open(%s): %s",file,strerror(errno)));
	 return;
      }
      int w=pwrite(fd,buf,MIN(f_rest,len),f_pos);
      int saved_errno=errno;
      if(w==-1) {
	 SetError(xstring::format("pwrite(%s): %s",file,strerror(saved_errno)));
	 return;
      }
      if(w==0) {
	 SetError(xstring::format("pwrite(%s): write error - disk full?",file));
	 return;
      }
      buf+=w;
      begin+=w;
      len-=w;
   }

   while(bc-->0) {
      SetBlockPresent(piece,b++);
   }
   if(AllBlocksPresent(piece) && !my_bitfield->get_bit(piece)) {
      ValidatePiece(piece);
      if(!my_bitfield->get_bit(piece)) {
	 LogError(0,"new piece %u digest mismatch",piece);
	 src_peer->MarkPieceInvalid(piece);
	 return;
      }
      LogNote(3,"piece %u complete",piece);
      timeout_timer.Reset();
      SetPieceNotWanted(piece);
      for(int i=0; i<peers.count(); i++)
	 peers[i]->Have(piece);
      if(my_bitfield->has_all_set() && !complete) {
	 complete=true;
	 seed_timer.Reset();
	 end_game=false;
	 ScanPeers();
	 SendTrackersRequest("completed");
	 recv_rate.Reset();
      }
   }
}
void Torrent::SendTrackersRequest(const char *event) const
{
   for(int i=0; i<trackers.count(); i++) {
      if(!trackers[i]->Failed())
	 trackers[i]->SendTrackerRequest(event);
   }
}

const xstring& Torrent::RetrieveBlock(unsigned piece,unsigned begin,unsigned len)
{
   static xstring buf;
   buf.truncate(0);
   buf.get_space(len);

   off_t f_pos=0;
   off_t f_rest=len;
   while(len>0) {
      const char *file=FindFileByPosition(piece,begin,&f_pos,&f_rest);
      int fd=OpenFile(file,O_RDONLY,validating?f_pos+f_rest:0);
      if(fd==-1)
	 return xstring::null;
      int w=pread(fd,buf.add_space(len),MIN(f_rest,len),f_pos);
      if(w==-1) {
	 SetError(xstring::format("pread(%s): %s",file,strerror(errno)));
	 return xstring::null;
      }
      if(w==0) {
// 	 buf.append_padding(len,'\0');
	 break;
      }
      buf.add_commit(w);
      begin+=w;
      len-=w;
      if(validating && w==f_rest)
	 CloseFile(file);
   }
   return buf;
}

TorrentPeer *Torrent::FindPeerById(const xstring& p_id)
{
   // linear search - peers count<100, called rarely
   for(int i=0; i<peers.count(); i++) {
      const TorrentPeer *peer=peers[i];
      if(peer->peer_id.eq(p_id))
	 return const_cast<TorrentPeer*>(peer);
   }
   return 0;
}

void Torrent::ScanPeers() {
   // scan existing peers
   for(int i=0; i<peers.count(); i++) {
      const TorrentPeer *peer=peers[i];
      const char *blacklist_time="2h";
      if(peer->Failed()) {
	 LogError(2,"peer %s failed: %s",peer->GetName(),peer->ErrorText());
      } else if(peer->Disconnected() && peer->ActivityTimedOut()) {
	 LogNote(4,"peer %s disconnected",peer->GetName());
      } else if(peer->myself) {
	 LogNote(4,"removing myself-connected peer %s",peer->GetName());
	 blacklist_time="forever";
      } else if(peer->duplicate) {
	 LogNote(4,"removing duplicate peer %s",peer->GetName());
      } else if(complete && peer->Seed()) {
	 LogNote(4,"removing unneeded peer %s (%s)",peer->GetName(),peers[i]->Status());
	 blacklist_time="1d";
      } else {
	 // keep the peer.
	 continue;
      }
      if(blacklist_time)
	 BlackListPeer(peer,blacklist_time);
      peers.remove(i--);
   }
   ReducePeers();
   peers_scan_timer.Reset();
}

void Torrent::ReducePeers()
{
   if(max_peers>0 && peers.count()>max_peers) {
      // remove least interesting peers.
      peers.qsort(PeersCompareActivity);
      int to_remove=peers.count()-max_peers;
      while(to_remove-->0) {
	 TimeInterval max_idle(peers.last()->activity_timer.TimePassed());
	 LogNote(3,"removing peer %s (too many; idle:%s)",peers.last()->GetName(),
	    max_idle.toString(TimeInterval::TO_STR_TERSE+TimeInterval::TO_STR_TRANSLATE));
	 peers.chop();
	 if(max_idle<60)
	    decline_timer.Set(60-max_idle.Seconds());
      }
   }
   peers.qsort(complete ? PeersCompareSendRate : PeersCompareRecvRate);
   ReduceUploaders();
   ReduceDownloaders();
   UnchokeBestUploaders();
}
void Torrent::ReduceUploaders()
{
   bool rate_low = RateLow(RateLimit::GET);
   if(am_interested_peers_count < (rate_low?max_uploaders:min_uploaders+1))
      return;
   // make the slowest uninterested
   for(int i=0; i<peers.count(); i++) {
      TorrentPeer *peer=peers[i].get_non_const();
      if(peer->am_interested) {
	 if(peer->interest_timer.TimePassed() <= 30)
	    break;
	 peer->SetAmInterested(false);
	 if(am_interested_peers_count < max_uploaders)
	    break;
      }
   }
}
void Torrent::ReduceDownloaders()
{
   bool rate_low = RateLow(RateLimit::PUT);
   if(am_not_choking_peers_count < (rate_low?max_downloaders:min_downloaders+1))
      return;
   // choke the slowest
   for(int i=0; i<peers.count(); i++) {
      TorrentPeer *peer=peers[i].get_non_const();
      if(!peer->am_choking && peer->peer_interested) {
	 if(peer->choke_timer.TimePassed() <= 30)
	    break;
	 peer->SetAmChoking(true);
	 if(am_not_choking_peers_count < max_downloaders)
	    break;
      }
   }
}

bool Torrent::NeedMoreUploaders()
{
   if(!metadata || validating)
      return false;
   return RateLow(RateLimit::GET) && am_interested_peers_count < max_uploaders
      && am_interested_timer.Stopped();
}
bool Torrent::AllowMoreDownloaders()
{
   if(!metadata || validating)
      return false;
   return RateLow(RateLimit::PUT) && am_not_choking_peers_count < max_downloaders;
}

void Torrent::UnchokeBestUploaders()
{
   if(!metadata)
      return;

   // unchoke 4 best uploaders
   int limit = 4;

   int count=0;
   for(int i=peers.count()-1; i>=0 && count<limit; i--) {
      TorrentPeer *peer=peers[i].get_non_const();
      if(!peer->Connected())
	 continue;
      if(!peer->choke_timer.Stopped())
	 continue;   // cannot change choke status yet
      if(!peer->peer_interested)
	 continue;
      peer->SetAmChoking(false);
      count++;
   }
}
void Torrent::OptimisticUnchoke()
{
   xarray<TorrentPeer*> choked_peers;
   for(int i=peers.count()-1; i>=0; i--) {
      TorrentPeer *peer=peers[i].get_non_const();
      if(!peer->Connected())
	 continue;
      if(!peer->choke_timer.Stopped())
	 continue;   // cannot change choke status yet
      if(peer->am_choking) {
	 if(!peer->peer_interested) {
	    peer->SetAmChoking(false);
	    continue;
	 }
	 choked_peers.append(peer);
	 if(peer->retry_timer.TimePassed()<60) {
	    // newly connected is more likely to be unchoked
	    choked_peers.append(peer);
	    choked_peers.append(peer);
	 }
      }
   }
   if(choked_peers.count()==0)
      return;
   choked_peers[rand()/13%choked_peers.count()]->SetAmChoking(false);
   optimistic_unchoke_timer.Reset();
}

int Torrent::PeerBytesAllowed(const TorrentPeer *peer,RateLimit::dir_t dir)
{
   float peer_rate=(dir==RateLimit::GET ? peer->peer_send_rate : peer->peer_recv_rate).Get();
   float total_rate=(dir==RateLimit::GET ? send_rate : recv_rate).Get();
   const int min_rate = 1024;
   // the more is the opposite rate the more rate allowed, with a minimum
   float bytes = rate_limit.BytesAllowed(dir);
   bytes *= (peer_rate  + min_rate)
          / (total_rate + active_peers_count*min_rate);
   return (int)bytes;
}
void Torrent::PeerBytesUsed(int b,RateLimit::dir_t dir)
{
   rate_limit.BytesUsed(b,dir);
}
void Torrent::Reconfig(const char *name)
{
   const char *c=GetName();
   max_peers=ResMgr::Query("torrent:max-peers",c);
   seed_min_peers=ResMgr::Query("torrent:seed-min-peers",c);
   stop_on_ratio=ResMgr::Query("torrent:stop-on-ratio",c);
   stop_min_ppr=ResMgr::Query("torrent:stop-min-ppr",c);
   rate_limit.Reconfig(name,metainfo_url);
   if(listener)
      StartDHT();
}

const xstring& Torrent::Status()
{
   if(metainfo_copy)
      return xstring::format(_("Getting meta-data: %s"),metainfo_copy->GetStatus());
   if(validating) {
      return xstring::format(_("Validation: %u/%u (%u%%) %s%s"),validate_index,total_pieces,
	 validate_index*100/total_pieces,recv_rate.GetStrS(),
	 recv_rate.GetETAStrFromSize((off_t)(total_pieces-validate_index-1)*piece_length+last_piece_length).get());
   }
   if(building)
      return building->Status();
   if(!metadata && !build_md) {
      if(md_download.length()>0)
	 return xstring::format(_("Getting meta-data: %s"),
	    xstring::format("%u/%u",(unsigned)md_download.length(),(unsigned)metadata_size).get());
      else
	 return xstring::get_tmp(_("Waiting for meta-data..."));
   }
   if(shutting_down) {
      for(int i=0; i<trackers.count(); i++) {
	 if(!trackers[i]->IsActive())
	    continue;
	 const char *status=trackers[i]->Status();
	 if(status[0]) {
	    xstring &s=xstring::get_tmp(_("Shutting down: "));
	    if(trackers.count()>1)
	       s.appendf("%d. ",i+1);
	    s.append(status);
	    return s;
	 }
      }
      return xstring::get_tmp("");
   }
   if(total_length==0)
      return xstring::get_tmp("");

   char dn_buf[LONGEST_HUMAN_READABLE + 1];
   char up_buf[LONGEST_HUMAN_READABLE + 1];
   xstring& buf=xstring::format("dn:%s %sup:%s %s",
      human_readable(total_recv, dn_buf, human_autoscale|human_SI, 1, 1),
      recv_rate.GetStrS(),
      human_readable(total_sent, up_buf, human_autoscale|human_SI, 1, 1),
      send_rate.GetStrS());
   if(!complete) {
      buf.appendf("complete:%u/%u (%u%%)",complete_pieces,total_pieces,
	 unsigned((total_length-total_left)*100/total_length));
      buf.append(' ');
      if(min_piece_sources)
	 buf.append(recv_rate.GetETAStrFromSize(total_left));
      if(end_game)
	 buf.append(" end-game");
   } else {
      buf.appendf("complete, ratio:%.2f/%.2f/%.2f",
	 GetMinPerPieceRatio(),GetRatio(),GetMaxPerPieceRatio());
   }
   return buf;
}


TorrentPeer::TorrentPeer(Torrent *p,const sockaddr_u *a,int t_no)
   : timeout_timer(360), retry_timer(30), keepalive_timer(120),
     choke_timer(10), interest_timer(10), activity_timer(300),
     msg_ext_metadata(0), msg_ext_pex(0), metadata_size(0)
{
   parent=p;
   tracker_no=t_no;
   addr=*a;
   sock=-1;
   udp_port=0;
   connected=false;
   passive=false;
   duplicate=0;
   myself=false;
   peer_choking=true;
   am_choking=true;
   peer_interested=false;
   am_interested=false;
   upload_only=false;
   peer_complete_pieces=0;
   retry_timer.Stop();
   retry_timer.AddRandom(2);
   choke_timer.Stop();
   interest_timer.Stop();
   last_piece=NO_PIECE;
   if(addr.is_reserved() || addr.is_multicast() || addr.port()==0)
      SetError("invalid peer address");
   peer_bytes_pool[0]=peer_bytes_pool[1]=0;
   peer_recv=peer_sent=0;
   invalid_piece_count=0;
}
TorrentPeer::~TorrentPeer()
{
}
void TorrentPeer::PrepareToDie()
{
   Disconnect();
}

void TorrentPeer::Connect(int s,IOBuffer *rb)
{
   sock=s;
   recv_buf=rb;
   connected=true;
   passive=true;
}

void TorrentPeer::SetError(const char *s)
{
   error=Error::Fatal(s);
   LogError(11,"fatal error: %s",s);
   Disconnect(s);
}

void TorrentPeer::Restart()
{
   if(!Connected())
      return;
   Disconnect();
   retry_timer.Stop();
   retry_timer.AddRandom(2);
}

void TorrentPeer::Disconnect(const char *dc)
{
   Enter();
   if(Connected() && !recv_buf->Eof())
      LogNote(4,"closing connection");
   recv_queue.empty();
   ClearSentQueue();
   if(peer_bitfield) {
      for(unsigned p=0; p<parent->total_pieces; p++)
	 SetPieceHaving(p,false);
      peer_bitfield=0;
   }
   peer_id.unset();
   fast_set.empty();
   suggested_set.empty();
   recv_buf=0;
   send_buf=0;
   if(sock!=-1) {
      close(sock);
      sock=-1;
      connected=false;
      last_dc.set(dc);
   }
   parent->am_interested_peers_count-=am_interested;
   am_interested=false;
   parent->am_not_choking_peers_count-=!am_choking;
   am_choking=true;
   peer_interested=false;
   peer_choking=true;
   peer_complete_pieces=0;
   retry_timer.Reset();
   choke_timer.Stop();
   interest_timer.Stop();
   // return to main pool
   parent->PeerBytesUsed(-peer_bytes_pool[RateLimit::GET],RateLimit::GET);
   parent->PeerBytesUsed(-peer_bytes_pool[RateLimit::PUT],RateLimit::PUT);
   peer_bytes_pool[0]=peer_bytes_pool[1]=0;
   Leave();
}

void TorrentPeer::SendHandshake()
{
   const char *const protocol="BitTorrent protocol";
   int proto_len=strlen(protocol);
   send_buf->PackUINT8(proto_len);
   send_buf->Put(protocol,proto_len);
   static char extensions[8] = {
      // extensions[7]&0x01 - DHT Protocol (http://www.bittorrent.org/beps/bep_0005.html)
      // extensions[7]&0x04 - Fast Extension (http://www.bittorrent.org/beps/bep_0006.html)
      // extensions[5]&0x10 - Extension Protocol (http://www.bittorrent.org/beps/bep_0010.html)
      0, 0, 0, 0, 0, 0x10, 0, 0x05,
   };
   if(ResMgr::QueryBool("torrent:use-dht",0))
      extensions[7]|=0x01;
   else
      extensions[7]&=~0x01;
   send_buf->Put(extensions,8);
   send_buf->Put(parent->info_hash);
   send_buf->Put(parent->my_peer_id);
   LogSend(9,"handshake");
}
TorrentPeer::unpack_status_t TorrentPeer::RecvHandshake()
{
   unsigned proto_len=0;
   if(recv_buf->Size()>0)
      proto_len=recv_buf->UnpackUINT8();

   if((unsigned)recv_buf->Size()<1+proto_len+8+SHA1_DIGEST_SIZE+Torrent::PEER_ID_LEN)
      return recv_buf->Eof() ? UNPACK_PREMATURE_EOF : UNPACK_NO_DATA_YET;

   int unpacked=1;
   const char *data=recv_buf->Get();

   xstring protocol(data+unpacked,proto_len);
   unpacked+=proto_len;

   memcpy(extensions,data+unpacked,8);
   unpacked+=8; // 8 bytes are reserved (extensions)

   xstring peer_info_hash(data+unpacked,SHA1_DIGEST_SIZE);
   unpacked+=SHA1_DIGEST_SIZE;
   if(peer_info_hash.ne(parent->info_hash)) {
      LogError(0,"got info_hash: %s, wanted: %s",peer_info_hash.hexdump(),parent->info_hash.hexdump());
      SetError("peer info_hash mismatch");
      return UNPACK_WRONG_FORMAT;
   }

   const xstring& tmp_peer_id=xstring::get_tmp(recv_buf->Get()+unpacked,Torrent::PEER_ID_LEN);
   unpacked+=Torrent::PEER_ID_LEN;
   // if we have already such a peer, then this peer or the other one
   // much be marked as duplicate and then removed in ScanPeers.
   duplicate=parent->FindPeerById(tmp_peer_id);
   if(duplicate && !duplicate->Connected()) {
      duplicate->duplicate=this;
      duplicate=0;
   }
   peer_id.set(tmp_peer_id);

   recv_buf->Skip(unpacked);
   LogRecv(4,xstring::format("handshake, %s, peer_id: %s, reserved: %02x%02x%02x%02x%02x%02x%02x%02x",
      protocol.dump(),url::encode(peer_id,"").get(),
      extensions[0],extensions[1],extensions[2],extensions[3],
      extensions[4],extensions[5],extensions[6],extensions[7]));

   return UNPACK_SUCCESS;
}
void TorrentPeer::SendExtensions()
{
  if(!LTEPExtensionEnabled())
      return;
   xmap_p<BeNode> m;
   m.add("ut_metadata",new BeNode(MSG_EXT_METADATA));
   m.add("ut_pex",new BeNode(MSG_EXT_PEX));
   xmap_p<BeNode> ext;
   ext.add("m",new BeNode(&m));
   ext.add("p",new BeNode(parent->GetPort()));
   ext.add("v",new BeNode(PACKAGE "/" VERSION));
   ext.add("reqq",new BeNode(MAX_QUEUE_LEN*16));
   if(parent->Complete())
      ext.add("upload_only",new BeNode(1));
   if(parent->metadata)
      ext.add("metadata_size",new BeNode(parent->metadata.length()));

   const char *ip=ResMgr::Query("torrent:ip",0);
   sockaddr_u sa;
   socklen_t sa_len=sizeof(sa);
   if((ip && ip[0] && inet_aton(ip,&sa.in.sin_addr))
   || (getsockname(sock,&sa.sa,&sa_len)!=-1 && sa.sa.sa_family==AF_INET))
      ext.add("ipv4",new BeNode((const char*)&sa.in.sin_addr,4));

#if INET6
   const char *ipv6=ResMgr::Query("torrent:ipv6",0);
   sa_len=sizeof(sa);
   if((ipv6 && ipv6[0] && inet_pton(AF_INET6,ipv6,&sa.in6.sin6_addr)>0)
   || (getsockname(sock,&sa.sa,&sa_len)!=-1 && sa.sa.sa_family==AF_INET6))
      ext.add("ipv6",new BeNode((const char*)&sa.in6.sin6_addr,16));
#endif

   sa_len=sizeof(sa);
   if(getpeername(sock,&sa.sa,&sa_len)!=-1) {
      if(sa.sa.sa_family==AF_INET)
	 ext.add("yourip",new BeNode((const char*)&sa.in.sin_addr,4));
#if INET6
      else if(sa.sa.sa_family==AF_INET6)
         ext.add("yourip",new BeNode((const char*)&sa.in6.sin6_addr,16));
#endif
   }

   PacketExtended pkt(MSG_EXT_HANDSHAKE,new BeNode(&ext));
   pkt.Pack(send_buf);
   LogSend(9,xstring::format("extended(%u,%s)",pkt.code,pkt.data->Format1()));
}

void TorrentPeer::SendDataReply()
{
   const PacketRequest *p=recv_queue.next();
   Enter(parent);
   const xstring& data=parent->RetrieveBlock(p->index,p->begin,p->req_length);
   Leave(parent);
   if(!Connected()) // we can be disconnected by parent
      return;
   if(data.length()!=p->req_length) {
      if(parent->my_bitfield->get_bit(p->index))
	 parent->SetError(xstring::format("failed to read piece %u",p->index));
      return;
   }
   LogSend(8,xstring::format("piece:%u begin:%u size:%u",p->index,p->begin,p->req_length));
   PacketPiece(p->index,p->begin,data).Pack(send_buf);
   peer_sent+=data.length();
   peer_send_rate.Add(data.length());
   parent->AccountSend(p->index,data.length());
   BytesPut(data.length());
   activity_timer.Reset();
}

int TorrentPeer::SendDataRequests(unsigned p)
{
   if(p==NO_PIECE)
      return 0;
   if(parent->my_bitfield->get_bit(p)
   || !peer_bitfield || !peer_bitfield->get_bit(p))
      return 0;

   int sent=0;
   unsigned blocks=parent->BlocksInPiece(p);
   unsigned bytes_allowed=BytesAllowed(RateLimit::GET);
   for(unsigned b=0; b<blocks; b++) {
      if(parent->BlockPresent(p,b))
	 continue;
      if(parent->piece_info[p].downloader_for(b)) {
	 if(!parent->end_game)
	    continue;
	 if(parent->piece_info[p].downloader_for(b)==this)
	    continue;
	 if(FindRequest(p,b*Torrent::BLOCK_SIZE)>=0)
	    continue;
      }

      unsigned begin=b*Torrent::BLOCK_SIZE;
      unsigned len=Torrent::BLOCK_SIZE;

      if(b==blocks-1) {
	 assert(begin<parent->PieceLength(p));
	 unsigned max_len=parent->PieceLength(p)-begin;
	 if(len>max_len)
	    len=max_len;
      }

      if(bytes_allowed<len)
	 break;

      parent->SetDownloader(p,b,0,this);
      PacketRequest *req=new PacketRequest(p,b*Torrent::BLOCK_SIZE,len);
      LogSend(6,xstring::format("request piece:%u begin:%u size:%u",p,b*Torrent::BLOCK_SIZE,len));
      req->Pack(send_buf);
      sent_queue.push(req);
      SetLastPiece(p);
      sent++;
      activity_timer.Reset();
      bytes_allowed-=len;
      BytesGot(len);

      if(sent_queue.count()>=MAX_QUEUE_LEN)
	 break;
   }
   return sent;
}

bool TorrentPeer::InFastSet(unsigned p) const
{
   for(int i=0; i<fast_set.count(); i++)
      if(fast_set[i]==p)
	 return true;
   return false;
}

void TorrentPeer::SendDataRequests()
{
   assert(am_interested);

   if(peer_choking && !FastExtensionEnabled())
      return;
   if(sent_queue.count()>=MAX_QUEUE_LEN)
      return;
   if(!BytesAllowedToGet(Torrent::BLOCK_SIZE))
      return;

   if(peer_choking) {
      // try to continue getting last piece if it is in the fast set
      unsigned last_piece=GetLastPiece();
      if(last_piece!=NO_PIECE && InFastSet(last_piece) && SendDataRequests(last_piece)>0)
	 return;
      // try fast set when choking
      while(fast_set.count()>0) {
	 unsigned p=fast_set[0];
	 if(SendDataRequests(p)>0)
	    return;
	 fast_set.next();
      }
      return;
   }

   // try to continue getting last piece
   if(SendDataRequests(GetLastPiece())>0)
      return;

   // try suggested pieces
   while(suggested_set.count()>0) {
      unsigned p=suggested_set.next();
      if(SendDataRequests(p)>0)
	 return;
   }

   // pick a new piece
   unsigned p=NO_PIECE;
   for(int i=0; i<parent->pieces_needed.count(); i++) {
      if(peer_bitfield->get_bit(parent->pieces_needed[i])) {
	 p=parent->pieces_needed[i];
	 if(parent->my_bitfield->get_bit(p))
	    continue;
	 // add some randomness, so that different instances don't synchronize
	 if(parent->AllBlocksAbsent(p) && random()/13%16==0)
	    continue;
	 if(SendDataRequests(p)>0)
	    return;
      }
   }
   if(p==NO_PIECE && interest_timer.Stopped())
      SetAmInterested(false);
}

void TorrentPeer::Have(unsigned p)
{
   if(!send_buf)
      return;
   Enter();
   LogSend(9,xstring::format("have(%u)",p));
   PacketHave(p).Pack(send_buf);
   Leave();
}
int TorrentPeer::FindRequest(unsigned piece,unsigned begin) const
{
   for(int i=0; i<sent_queue.count(); i++) {
      const PacketRequest *req=sent_queue[i];
      if(req->index==piece && req->begin==begin)
	 return i;
   }
   return -1;
}
void TorrentPeer::CancelBlock(unsigned p,unsigned b)
{
   if(!send_buf)
      return;
   Enter();
   int i=FindRequest(p,b);
   if(i>=0) {
      const PacketRequest *req=sent_queue[i];
      LogSend(9,xstring::format("cancel(%u,%u)",p,b));
      PacketCancel(p,b,req->req_length).Pack(send_buf);
      parent->SetDownloader(p,b/Torrent::BLOCK_SIZE,this,0);
      sent_queue.remove(i);
   }
   Leave();
}

// mark that peer as having an invalid piece
void TorrentPeer::MarkPieceInvalid(unsigned p)
{
   invalid_piece_count++;
   SetPieceHaving(p,false);
   SetAmInterested(am_interested);
   if(invalid_piece_count>5)
      parent->BlackListPeer(this,"1d");
}

void TorrentPeer::ClearSentQueue(int i)
{
   if(i<0)
      return;
   if(!FastExtensionEnabled()) {
      // without Fast Extension we assume sequential packet processing,
      // thus clear also all sent requests before this one.
      while(i-->=0) {
	 const PacketRequest *req=sent_queue.next();
	 parent->PeerBytesGot(-req->req_length);
	 parent->SetDownloader(req->index,req->begin/Torrent::BLOCK_SIZE,this,0);
      }
   } else {
      const PacketRequest *req=sent_queue[i];
      parent->PeerBytesGot(-req->req_length);
      parent->SetDownloader(req->index,req->begin/Torrent::BLOCK_SIZE,this,0);
      sent_queue.remove(i);
   }
}

int TorrentPeer::BytesAllowed(RateLimit::dir_t dir)
{
   int a=parent->PeerBytesAllowed(this,dir);
   int pool_target=Torrent::BLOCK_SIZE*2;
   if(peer_bytes_pool[dir]<pool_target) {
      int to_pool=pool_target-peer_bytes_pool[dir];
      if(to_pool>a)
	 to_pool=a;
      peer_bytes_pool[dir]+=to_pool;
      a-=to_pool;
      parent->PeerBytesUsed(to_pool,dir);
   }
   return peer_bytes_pool[dir]+a;
}
bool TorrentPeer::BytesAllowed(RateLimit::dir_t dir,unsigned bytes)
{
   int a=BytesAllowed(dir);
   if(bytes<=(unsigned)a)
      return true;
   TimeoutS(1);
   return false;
}
void TorrentPeer::BytesUsed(int b,RateLimit::dir_t dir)
{
   if(peer_bytes_pool[dir]>=b)
      peer_bytes_pool[dir]-=b;
   else {
      b-=peer_bytes_pool[dir];
      peer_bytes_pool[dir]=0;
      parent->PeerBytesUsed(b,dir);
   }
}

unsigned TorrentPeer::GetLastPiece() const
{
   if(!peer_bitfield)
      return NO_PIECE;
   unsigned p=last_piece;
   // continue if have any blocks already
   if(p!=NO_PIECE && !parent->my_bitfield->get_bit(p)
   && parent->AnyBlocksPresent(p)
   && peer_bitfield->get_bit(p))
      return p;
   p=parent->last_piece;
   if(p!=NO_PIECE && !parent->my_bitfield->get_bit(p)
   && peer_bitfield->get_bit(p))
      return p;
   p=last_piece;
   if(p!=NO_PIECE && !parent->my_bitfield->get_bit(p)
   && peer_bitfield->get_bit(p))
      return p;
   return NO_PIECE;
}

void TorrentPeer::SetLastPiece(unsigned p)
{
   if(last_piece==NO_PIECE || parent->my_bitfield->get_bit(last_piece))
      last_piece=p;
   if(parent->last_piece==NO_PIECE || parent->my_bitfield->get_bit(parent->last_piece))
      parent->last_piece=p;
}

void TorrentPeer::SetAmInterested(bool interest)
{
   if(invalid_piece_count>5)
      interest=false;
   if(am_interested==interest)
      return;
   Enter();
   LogSend(6,interest?"interested":"uninterested");
   Packet(interest?MSG_INTERESTED:MSG_UNINTERESTED).Pack(send_buf);
   parent->am_interested_peers_count+=(interest-am_interested);
   am_interested=interest;
   interest_timer.Reset();
   if(am_interested)
      parent->am_interested_timer.Reset();
   (void)BytesAllowed(RateLimit::GET); // draw some bytes from the common pool
   Leave();
}
void TorrentPeer::SetAmChoking(bool c)
{
   if(am_choking==c)
      return;
   Enter();
   LogSend(6,c?"choke":"unchoke");
   Packet(c?MSG_CHOKE:MSG_UNCHOKE).Pack(send_buf);
   parent->am_not_choking_peers_count-=(c-am_choking);
   am_choking=c;
   choke_timer.Reset();
   if(am_choking) {
      if(!FastExtensionEnabled()) {
	 recv_queue.empty();
      } else {
	 // send rejects
	 while(recv_queue.count()>0) {
	    const PacketRequest *p=recv_queue.next();
	    LogSend(6,xstring::format("reject-request piece:%u begin:%u size:%u",p->index,p->begin,p->req_length));
	    PacketRejectRequest(p->index,p->begin,p->req_length).Pack(send_buf);
	 }
      }
   }
   Leave();
}

void TorrentPeer::SetPieceHaving(unsigned p,bool have)
{
   int diff = (have - peer_bitfield->get_bit(p));
   if(!diff)
      return;
   parent->piece_info[p].add_sources_count(diff);
   peer_complete_pieces+=diff;
   peer_bitfield->set_bit(p,have);

   if(parent->piece_info[p].get_sources_count()==0)
      parent->SetPieceNotWanted(p);
   if(have && send_buf && !am_interested && !parent->my_bitfield->get_bit(p)
   && parent->NeedMoreUploaders()) {
      SetAmInterested(true);
      SetLastPiece(p);
   }
}

void TorrentPeer::HandlePacket(Packet *p)
{
   switch(p->GetPacketType())
   {
   case MSG_KEEPALIVE: {
	 LogRecv(5,"keep-alive");
	 break;
      }
   case MSG_CHOKE: {
	 LogRecv(5,"choke");
	 peer_choking=true;
	 ClearSentQueue(); // discard pending requests
	 break;
      }
   case MSG_UNCHOKE: {
	 LogRecv(5,"unchoke");
	 peer_choking=false;
	 if(am_interested)
	    SendDataRequests();
	 break;
      }
   case MSG_INTERESTED: {
	 LogRecv(5,"interested");
	 peer_interested=true;
	 break;
      }
   case MSG_UNINTERESTED: {
	 LogRecv(5,"uninterested");
	 recv_queue.empty();
	 peer_interested=false;
	 break;
      }
   case MSG_HAVE: {
	 if(!parent->HasMetadata())
	    break;
	 PacketHave *pp=static_cast<PacketHave*>(p);
	 LogRecv(5,xstring::format("have(%u)",pp->piece));
	 if(!parent->HasMetadata())
	    break;
	 if(pp->piece>=parent->total_pieces) {
	    SetError("invalid piece index");
	    break;
	 }
	 SetPieceHaving(pp->piece,true);
	 break;
      }
   case MSG_BITFIELD: {
	 if(!parent->HasMetadata())
	    break;
	 PacketBitField *pp=static_cast<PacketBitField*>(p);
	 if(pp->bitfield->count()<(int)parent->total_pieces/8) {
	    LogError(9,"bitfield length %d, expected %u",pp->bitfield->count(),parent->total_pieces/8);
	    SetError("invalid bitfield length");
	    break;
	 }
	 if(pp->bitfield->has_any_set(parent->total_pieces,pp->bitfield->get_bit_length())) {
	    SetError("bitfield has spare bits set");
	    break;
	 }
	 for(unsigned p=0; p<parent->total_pieces; p++)
	    SetPieceHaving(p,pp->bitfield->get_bit(p));
	 LogRecv(5,xstring::format("bitfield(%u/%u)",peer_complete_pieces,parent->total_pieces));
	 break;
      }
   case MSG_PORT: {
	 PacketPort *pp=static_cast<PacketPort*>(p);
	 LogRecv(5,xstring::format("port(%u)",pp->port));
	 udp_port=pp->port;
	 if(DHT_Enabled() && Torrent::dht) {
	    sockaddr_u a(addr);
	    a.set_port(udp_port);
	    Torrent::GetDHT(a)->SendPing(a);
	 }
	 break;
      }
   case MSG_HAVE_ALL: {
	 LogRecv(5,"have-all");
	 if(!FastExtensionEnabled()) {
	    SetError("fast extension is disabled");
	    break;
	 }
	 if(!parent->HasMetadata())
	    break;
	 for(unsigned p=0; p<parent->total_pieces; p++)
	    SetPieceHaving(p,1);
	 break;
      }
   case MSG_HAVE_NONE: {
	 LogRecv(5,"have-none");
	 if(!FastExtensionEnabled()) {
	    SetError("fast extension is disabled");
	    break;
	 }
	 if(!parent->HasMetadata())
	    break;
	 for(unsigned p=0; p<parent->total_pieces; p++)
	    SetPieceHaving(p,0);
	 break;
      }
   case MSG_SUGGEST_PIECE: {
	 PacketSuggestPiece *pp=static_cast<PacketSuggestPiece*>(p);
	 LogRecv(5,xstring::format("suggest-piece:%u",pp->piece));
	 if(!FastExtensionEnabled()) {
	    SetError("fast extension is disabled");
	    break;
	 }
	 if(!parent->HasMetadata())
	    break;
	 if(pp->piece>=parent->total_pieces) {
	    SetError("invalid piece index");
	    break;
	 }
	 suggested_set.push(pp->piece);
	 break;
      }
   case MSG_ALLOWED_FAST: {
	 PacketAllowedFast *pp=static_cast<PacketAllowedFast*>(p);
	 LogRecv(5,xstring::format("allowed-fast:%u",pp->piece));
	 if(!FastExtensionEnabled()) {
	    SetError("fast extension is disabled");
	    break;
	 }
	 if(!parent->HasMetadata())
	    break;
	 if(pp->piece>=parent->total_pieces) {
	    SetError("invalid piece index");
	    break;
	 }
	 fast_set.push(pp->piece);
	 break;
      }
   case MSG_REJECT_REQUEST: {
	 PacketRejectRequest *pp=static_cast<PacketRejectRequest*>(p);
	 LogRecv(5,xstring::format("reject-request(%u,%u)",pp->index,pp->begin));
	 if(!FastExtensionEnabled()) {
	    SetError("fast extension is disabled");
	    break;
	 }
	 int i=FindRequest(pp->index,pp->begin);
	 if(i>=0)
	    ClearSentQueue(i);
	 break;
      }
   case MSG_EXTENDED: {
	 PacketExtended *pp=static_cast<PacketExtended*>(p);
	 LogRecv(9,xstring::format("extended(%u,%s)",pp->code,pp->data->Format1()));
	 HandleExtendedMessage(pp);
	 break;
      }
   case MSG_PIECE: {
	 PacketPiece *pp=static_cast<PacketPiece*>(p);
	 LogRecv(7,xstring::format("piece:%u begin:%u size:%u",pp->index,pp->begin,(unsigned)pp->data.length()));
	 if(!parent->HasMetadata())
	    break;
	 if(pp->index>=parent->total_pieces) {
	    SetError("invalid piece index");
	    break;
	 }
	 if(pp->begin>=parent->PieceLength(pp->index)) {
	    SetError("invalid data offset");
	    break;
	 }
	 if(pp->begin+pp->data.length() > parent->PieceLength(pp->index)) {
	    SetError("invalid data length");
	    break;
	 }
	 int i=FindRequest(pp->index,pp->begin);
	 if(i<0) {
// 	    SetError("got a piece that was not requested");
	    break;
	 }
	 ClearSentQueue(i);
	 parent->PeerBytesGot(pp->data.length()); // re-take the bytes returned by ClearSentQueue
	 Enter(parent);
	 parent->StoreBlock(pp->index,pp->begin,pp->data.length(),pp->data.get(),this);
	 Leave(parent);

	 int len=pp->data.length();
	 peer_recv+=len;
	 peer_recv_rate.Add(len);
	 parent->AccountRecv(pp->index,len);

	 // request another block from the same piece
	 if(am_interested && (!peer_choking || InFastSet(pp->index)))
	    SendDataRequests(pp->index);
	 break;
      }
   case MSG_REQUEST: {
	 PacketRequest *pp=static_cast<PacketRequest*>(p);
	 LogRecv(5,xstring::format("request for piece:%u begin:%u size:%u",pp->index,pp->begin,pp->req_length));
	 if(pp->req_length>Torrent::BLOCK_SIZE*2) {
	    SetError("too large request");
	    break;
	 }
	 if(!parent->HasMetadata())
	    break;
	 if(am_choking)
	    break;
	 if(pp->index>=parent->total_pieces) {
	    SetError("invalid piece index");
	    break;
	 }
	 if(pp->begin>=parent->PieceLength(pp->index)) {
	    SetError("invalid data offset");
	    break;
	 }
	 if(pp->begin+pp->req_length > parent->PieceLength(pp->index)) {
	    SetError("invalid data length");
	    break;
	 }
	 if(recv_queue.count()>=MAX_QUEUE_LEN*16) {
	    SetError("too many requests");
	    break;
	 }
	 recv_queue.push(pp);
	 activity_timer.Reset();
	 p=0;
	 break;
      }
   case MSG_CANCEL: {
	 PacketCancel *pp=static_cast<PacketCancel*>(p);
	 LogRecv(5,xstring::format("cancel(%u,%u)",pp->index,pp->begin));
	 for(int i=0; i<recv_queue.count(); i++) {
	    const PacketRequest *req=recv_queue[i];
	    if(req->index==pp->index && req->begin==pp->begin) {
	       recv_queue.remove(i);
	       break;
	    }
	 }
	 break;
      }
   }
   delete p;
}

void Torrent::MetadataDownloaded()
{
   xstring new_info_hash;
   SHA1(md_download,new_info_hash);
   if(info_hash && info_hash.ne(new_info_hash)) {
      LogError(1,"downloaded metadata does not match info_hash, retrying");
      StartMetadataDownload();
      return;
   }
   if(SetMetadata(md_download))
      Startup();
   md_download.unset();
}

void TorrentPeer::SendMetadataRequest()
{
   if(!msg_ext_metadata || !parent->md_download || parent->md_download.length()>=metadata_size
   || parent->md_download.length()%Torrent::BLOCK_SIZE)
      return;
   xmap_p<BeNode> req;
   req.add("msg_type",new BeNode(UT_METADATA_REQUEST));
   req.add("piece",new BeNode(parent->md_download.length()/Torrent::BLOCK_SIZE));
   PacketExtended pkt(msg_ext_metadata,new BeNode(&req));
   LogSend(4,xstring::format("ut_metadata request %s",pkt.data->Format1()));
   pkt.Pack(send_buf);
}

void TorrentPeer::HandleExtendedMessage(PacketExtended *pp)
{
   if(pp->data->type!=BeNode::BE_DICT) {
      SetError("extended type must be DICT");
      return;
   }
   if(pp->code==MSG_EXT_HANDSHAKE) {
      BeNode *m=pp->data->lookup("m",BeNode::BE_DICT);
      if(m) {
	 msg_ext_metadata=m->lookup_int("ut_metadata");
	 msg_ext_pex=m->lookup_int("ut_pex");
      }
      metadata_size=parent->metadata_size=pp->data->lookup_int("metadata_size");
      upload_only=pp->data->lookup_int("upload_only");

      if(!parent->HasMetadata() && !msg_ext_metadata) {
	 Disconnect("peer cannot provide metadata");
	 return;
      }

      const xstring& v=pp->data->lookup_str("v");
      if(v)
	 LogNote(3,"peer version is %s",v.get());

      const xstring& myip=pp->data->lookup_str("yourip");
      if(myip && myip.length()==4) {
	 char ip[16];
	 inet_ntop(AF_INET,myip.get(),ip,sizeof(ip));
	 LogNote(5,"my external IPv4 is %s",ip);
      }
      if(passive) {
	 // use specified port number to connect back
	 int p=pp->data->lookup_int("p");
	 if(p && p>=1024 && p<=65535) {
	    LogNote(9,"using port %d to connect back",p);
	    addr.set_port(p);
	    passive=false;
	    // check the black list
	    if(Torrent::BlackListed(this)) {
	       SetError("blacklisted");
	       return;
	    }
	    // check for duplicates
	    TaskRefArray<TorrentPeer>& peers=parent->peers;
	    for(int i=0; i<peers.count(); i++) {
	       if(peers[i]!=this && peers[i]->AddressEq(this)) {
		  if(!peers[i]->Connected())
		     peers[i]->duplicate=this;
		  else
		     duplicate=peers[i].get_non_const();
		  return;
	       }
	    }
	 }
      }
      if(msg_ext_metadata && parent->md_download)
	 SendMetadataRequest();
   } else if(pp->code==MSG_EXT_METADATA) {
      BeNode *msg_type=pp->data->lookup("msg_type",BeNode::BE_INT);
      if(!msg_type) {
	 SetError("ut_metadata msg_type bad or missing");
	 return;
      }
      BeNode *piece=pp->data->lookup("piece",BeNode::BE_INT);
      if(!piece || piece->num<0 || piece->num>=INT_MAX/Torrent::BLOCK_SIZE) {
	 SetError("ut_metadata piece bad or missing");
	 return;
      }
      size_t offset=piece->num*Torrent::BLOCK_SIZE;
      xmap_p<BeNode> reply;
      switch(msg_type->num) {
	 case UT_METADATA_REQUEST: {
	    if(offset>parent->metadata.length()) {
	       reply.add("msg_type",new BeNode(UT_METADATA_REJECT));
	       reply.add("piece",new BeNode(piece->num));
	       PacketExtended pkt(msg_ext_metadata,new BeNode(&reply));
	       LogSend(4,xstring::format("ut_metadata reject %s",pkt.data->Format1()));
	       pkt.Pack(send_buf);
	       break;
	    }
	    const char *d=parent->metadata+offset;
	    unsigned len=parent->metadata.length()-offset;
	    if(len>Torrent::BLOCK_SIZE)
	       len=Torrent::BLOCK_SIZE;
	    reply.add("msg_type",new BeNode(UT_METADATA_DATA));
	    reply.add("piece",new BeNode(piece->num));
	    reply.add("total_size",new BeNode(parent->metadata.length()));
	    PacketExtended pkt(msg_ext_metadata,new BeNode(&reply));
	    LogSend(4,xstring::format("ut_metadata data %s",pkt.data->Format1()));
	    pkt.SetAppendix(d,len);
	    pkt.Pack(send_buf);
	    break;
	 }
	 case UT_METADATA_DATA: {
	    if(parent->md_download) {
	       if(offset==parent->md_download.length()) {
		  BeNode *b_size=pp->data->lookup("total_size",BeNode::BE_INT);
		  if(b_size) {
		     if(metadata_size && metadata_size!=(size_t)b_size->num) {
			SetError("metadata_size mismatch with total_size");
			return;
		     }
		     metadata_size=b_size->num;
		     parent->metadata_size=metadata_size;
		  }
		  parent->md_download.append(pp->appendix);
		  if(pp->appendix.length()<Torrent::BLOCK_SIZE)
		     parent->MetadataDownloaded();
	       }
	       SendMetadataRequest();
	    }
	    break;
	 }
	 case UT_METADATA_REJECT:
	    break;
	 default:
	    SetError("ut_metadata msg_type invalid value");
	    return;
      }
   } else if(pp->code==MSG_EXT_PEX) {
      if(!pex.recv_timer.Stopped())
	 return;
      pex.recv_timer.Reset();
      BeNode *added=pp->data->lookup("added",BeNode::BE_STR);
      BeNode *added6=pp->data->lookup("added6",BeNode::BE_STR);
      BeNode *added_f=pp->data->lookup("added.f",BeNode::BE_STR);
      BeNode *added6_f=pp->data->lookup("added6.f",BeNode::BE_STR);
      AddPEXPeers(added,added_f,6);
      AddPEXPeers(added6,added6_f,18);
   }
}
void TorrentPeer::AddPEXPeers(BeNode *added,BeNode *added_f,int addr_size)
{
   if(!added)
      return;

   const char *data=added->str;
   unsigned n=added->str.length()/addr_size;
   if(n>50)
      n=50;

   const char *flags=0;
   if(added_f && added_f->str.length()==n)
      flags=added_f->str;

   int peers_count=0;
   for(unsigned i=0; i<n; data+=addr_size, i++) {
      unsigned char f=(flags?flags[i]:pex.CONNECTABLE);
      if(!(f&pex.CONNECTABLE))
	 continue;
      if(parent->Complete() && (f&pex.SEED))
	 continue;
      sockaddr_u a;
      a.set_compact(data,addr_size);
      if(!a.is_compatible(this->addr))
	 continue;
      parent->AddPeer(new TorrentPeer(parent,&a,TR_PEX));
      peers_count++;
   }
   if(peers_count>0)
      LogNote(4,"%d %s peers added from PEX message",peers_count,addr_size==6?"ipv4":"ipv6");
}
void TorrentPeer::SendPEXPeers()
{
   pex.send_timer.Reset();
   if(!msg_ext_pex || parent->Private())
      return;
   xmap<char> old_sent;
   old_sent.move_here(pex.sent);
   int peer_count=0;
   xstring added;
   xstring added6;
   xstring added_f;
   xstring added6_f;
   xstring dropped;
   xstring dropped6;
   int a=0,a6=0,d=0,d6=0;
   for(int i=parent->peers.count(); i>0; i--) {
      const TorrentPeer *peer=parent->peers[i-1];
      if(!peer->Connected() || peer->IsPassive() || peer->Failed()
      || !peer->addr.is_compatible(this->addr) || peer==this || peer->myself)
	 continue;
      const xstring& ca=peer->addr.compact();
      if(old_sent.exists(ca)) {
	 old_sent.remove(ca);
	 continue;
      }
      unsigned char f=pex.CONNECTABLE;
      if(peer->Seed())
	 f|=pex.SEED;
      peer_count++;
      if(peer_count>50)
	 continue;
      if(ca.length()==6) {
	 added.append(ca);
	 added_f.append(f);
	 a++;
      } else {
	 added6.append(ca);
	 added6_f.append(f);
	 a6++;
      }
      pex.sent.add(ca,f);
   }
   peer_count=0;
   for(old_sent.each_begin(); !old_sent.each_finished(); old_sent.each_next())
   {
      const xstring& ca=old_sent.each_key();
      peer_count++;
      if(peer_count>50) {
	 // drop it later
	 pex.sent.add(ca,0);
	 continue;
      }
      if(ca.length()==6) {
	 dropped.append(ca);
	 d++;
      } else {
	 dropped6.append(ca);
	 d6++;
      }
   }
   if(a+a6+d+d6==0)
      return;
   xmap_p<BeNode> req;
   if(a) {
      req.add("added",new BeNode(added));
      req.add("added.f",new BeNode(added_f));
   }
   if(a6) {
      req.add("added6",new BeNode(added6));
      req.add("added6.f",new BeNode(added6_f));
   }
   if(d)
      req.add("dropped",new BeNode(dropped));
   if(d6)
      req.add("dropped6",new BeNode(dropped6));
   PacketExtended pkt(msg_ext_pex,new BeNode(&req));
   LogSend(4,xstring::format("ut_pex message: added=[%d,%d], dropped=[%d,%d]",a,a6,d,d6));
   pkt.Pack(send_buf);
}

bool TorrentPeer::HasNeededPieces()
{
   if(!peer_bitfield)
      return false;
   if(GetLastPiece()!=NO_PIECE)
      return true;
   for(int i=0; i<parent->pieces_needed.count(); i++)
      if(peer_bitfield->get_bit(parent->pieces_needed[i]))
	 return true;
   return false;
}

int TorrentPeer::Do()
{
   int m=STALL;
   if(error || myself)
      return m;
   if(sock==-1) {
      if(passive)
	 return m;
      if(!retry_timer.Stopped())
	 return m;
      if(parent->IsValidating())
	 return m;
      sock=SocketCreateTCP(addr.sa.sa_family,0);
      if(sock==-1)
      {
	 if(NonFatalError(errno))
	    return m;
	 SetError(xstring::format(_("cannot create socket of address family %d"),addr.sa.sa_family));
	 return MOVED;
      }
      LogNote(4,_("Connecting to peer %s port %u"),SocketNumericAddress(&addr),SocketPort(&addr));
      connected=false;
   }
   if(!connected) {
      int res=SocketConnect(sock,&addr);
      if(res==-1 && errno!=EINPROGRESS && errno!=EALREADY && errno!=EISCONN)
      {
	 int e=errno;
	 const char *error=strerror(e);
	 LogError(4,"connect(%s): %s\n",GetName(),error);
	 Disconnect(error);
	 if(FA::NotSerious(e) && !ActivityTimedOut())
	    return MOVED;
	 SetError(error);
	 return MOVED;
      }
      if(res==-1 && errno!=EISCONN) {
	 Block(sock,POLLOUT);
	 return m;
      }
      connected=true;
      timeout_timer.Reset();
      m=MOVED;
   }
   if(!recv_buf) {
      recv_buf=new IOBufferFDStream(new FDStream(sock,"<input-socket>"),IOBuffer::GET);
   }
   if(!send_buf) {
      send_buf=new IOBufferFDStream(new FDStream(sock,"<output-socket>"),IOBuffer::PUT);
      SendHandshake();
   }
   if(send_buf->Error())
   {
      LogError(2,"send: %s",send_buf->ErrorText());
      Disconnect(send_buf->ErrorText());
      return MOVED;
   }
   if(recv_buf->Error())
   {
      LogError(2,"receive: %s",recv_buf->ErrorText());
      Disconnect(recv_buf->ErrorText());
      return MOVED;
   }
   if(!peer_id) {
      // expect handshake
      unpack_status_t s=RecvHandshake();
      if(s==UNPACK_NO_DATA_YET)
	 return m;
      if(s!=UNPACK_SUCCESS) {
	 if(s==UNPACK_PREMATURE_EOF) {
	    if(recv_buf->Size()>0) {
	       LogError(2,_("peer unexpectedly closed connection after %s"),recv_buf->Dump());
	       Disconnect(_("peer unexpectedly closed connection"));
	    } else {
	       LogError(4,_("peer closed connection (before handshake)"));
	       Disconnect(_("peer closed connection (before handshake)"));
	    }
	 } else {
	    Disconnect(_("invalid peer response format"));
	 }
	 return MOVED;
      }
      if(!parent->HasMetadata() && !LTEPExtensionEnabled()) {
	 Disconnect("peer cannot provide metadata");
	 return MOVED;
      }
      timeout_timer.Reset();
      myself=peer_id.eq(Torrent::my_peer_id);
      if(myself)
	 return MOVED;
      SendExtensions();
      if(parent->HasMetadata())
	 peer_bitfield=new BitField(parent->total_pieces);
      if(FastExtensionEnabled()) {
	 if(parent->complete_pieces==0) {
	    LogSend(5,"have-none");
	    Packet(MSG_HAVE_NONE).Pack(send_buf);
	 } else if(parent->complete_pieces==parent->total_pieces) {
	    LogSend(5,"have-all");
	    Packet(MSG_HAVE_ALL).Pack(send_buf);
	 } else {
	    LogSend(5,"bitfield");
	    PacketBitField(parent->my_bitfield).Pack(send_buf);
	 }
      } else if(parent->my_bitfield && parent->my_bitfield->has_any_set()) {
	 LogSend(5,"bitfield");
	 PacketBitField(parent->my_bitfield).Pack(send_buf);
      }
      if(Torrent::listener_udp && DHT_Enabled()) {
	 int udp_port=Torrent::listener_udp->GetPort();
#if INET6
	 if(Torrent::listener_ipv6_udp && addr.sa.sa_family==AF_INET6)
	    udp_port=Torrent::listener_ipv6_udp->GetPort();
#endif
	 if(udp_port) {
	    LogSend(5,xstring::format("port(%d)",udp_port));
	    PacketPort(udp_port).Pack(send_buf);
	 }
      }
      keepalive_timer.Reset();
   }

   if(keepalive_timer.Stopped()) {
      LogSend(5,"keep-alive");
      Packet(MSG_KEEPALIVE).Pack(send_buf);
      keepalive_timer.Reset();
   }

   if(send_buf->Size()>(int)Torrent::BLOCK_SIZE*4)
      recv_buf->Suspend();
   else
      recv_buf->Resume();

   if(recv_buf->IsSuspended())
      return m;

   timeout_timer.Reset(send_buf->EventTime());
   timeout_timer.Reset(recv_buf->EventTime());
   if(timeout_timer.Stopped()) {
      LogError(0,_("Timeout - reconnecting"));
      Disconnect("timed out");
      return MOVED;
   }

   if(!am_interested && interest_timer.Stopped()
   && HasNeededPieces() && parent->NeedMoreUploaders())
      SetAmInterested(true);

   if(am_interested && sent_queue.count()<MAX_QUEUE_LEN)
      SendDataRequests();

   if(peer_interested && am_choking && choke_timer.Stopped()
   && parent->AllowMoreDownloaders())
      SetAmChoking(false);

   if(recv_queue.count()>0 && send_buf->Size()<(int)Torrent::BLOCK_SIZE*2) {
      unsigned bytes_allowed=BytesAllowed(RateLimit::PUT);
      while(bytes_allowed>=recv_queue[0]->req_length) {
	 bytes_allowed-=recv_queue[0]->req_length;
	 SendDataReply();
	 m=MOVED;
	 if(!Connected())
	    return m;
	 if(recv_queue.count()==0)
	    break;
	 if(send_buf->Size()>=(int)Torrent::BLOCK_SIZE)
	    m|=send_buf->Do();
	 if(send_buf->Size()>=(int)Torrent::BLOCK_SIZE*2)
	    break;
      }
   }

   if(recv_buf->Eof() && recv_buf->Size()==0) {
      LogError(4,_("peer closed connection"));
      Disconnect(_("peer closed connection"));
      return MOVED;
   }

   if(pex.send_timer.Stopped())
      SendPEXPeers();

   Packet *reply=0;
   unpack_status_t st=UnpackPacket(recv_buf,&reply);
   if(st==UNPACK_NO_DATA_YET)
      return m;
   if(st!=UNPACK_SUCCESS)
   {
      if(st==UNPACK_PREMATURE_EOF) {
	 LogError(2,_("peer unexpectedly closed connection after %s"),recv_buf->Dump());
	 Disconnect(_("peer unexpectedly closed connection"));
      } else {
	 LogError(2,_("invalid peer response format"));
	 Disconnect(_("invalid peer response format"));
      }
      return MOVED;
   }
   reply->DropData(recv_buf);
   HandlePacket(reply);
   return MOVED;
}

TorrentPeer::unpack_status_t TorrentPeer::UnpackPacket(SMTaskRef<IOBuffer>& b,TorrentPeer::Packet **p)
{
   Packet *&pp=*p;
   pp=0;

   Ref<Packet> probe(new Packet);
   unpack_status_t res=probe->Unpack(b);
   if(res!=UNPACK_SUCCESS)
      return res;

   LogRecvF(11,"got a packet, length=%d, type=%d(%s)\n",
      probe->GetLength(),probe->GetPacketType(),probe->GetPacketTypeText());

   switch(probe->GetPacketType())
   {
   case MSG_KEEPALIVE:
   case MSG_CHOKE:
   case MSG_UNCHOKE:
   case MSG_INTERESTED:
   case MSG_UNINTERESTED:
   case MSG_HAVE_ALL:
   case MSG_HAVE_NONE:
      pp=probe.borrow();
      break;
   case MSG_HAVE:
      pp=new PacketHave();
      break;
   case MSG_BITFIELD:
      pp=new PacketBitField();
      break;
   case MSG_REQUEST:
      pp=new PacketRequest();
      break;
   case MSG_PIECE:
      pp=new PacketPiece();
      break;
   case MSG_CANCEL:
      pp=new PacketCancel();
      break;
   case MSG_PORT:
      pp=new PacketPort();
      break;
   case MSG_SUGGEST_PIECE:
      pp=new PacketSuggestPiece();
      break;
   case MSG_ALLOWED_FAST:
      pp=new PacketAllowedFast();
      break;
   case MSG_REJECT_REQUEST:
      pp=new PacketRejectRequest();
      break;
   case MSG_EXTENDED:
      pp=new PacketExtended();
      break;
   }
   if(probe)
      res=pp->Unpack(b);
   if(res!=UNPACK_SUCCESS)
   {
      switch(res)
      {
      case UNPACK_PREMATURE_EOF:
	 LogError(0,"premature eof");
	 break;
      case UNPACK_WRONG_FORMAT:
	 LogError(0,"wrong packet format");
	 break;
      case UNPACK_NO_DATA_YET:
      case UNPACK_SUCCESS:
	 ;
      }
      if(probe)
	 probe->DropData(b);
      else
	 pp->DropData(b);
      delete pp;
      pp=0;
   }
   return res;
}

const char *TorrentPeer::Packet::GetPacketTypeText() const
{
   const char *const text_table[]={
      "keep-alive", "choke", "unchoke", "interested", "uninterested",
      "have", "bitfield", "request", "piece", "cancel", "port",
      "10", "11", "12",
      "suggest-piece", "have-all", "have-none", "reject-request", "allowed-fast",
      "18", "19",
      "extended",
   };
   return text_table[type+1];
}

TorrentPeer::unpack_status_t TorrentPeer::Packet::Unpack(const Buffer *b)
{
   unpacked=0;
   if(b->Size()<4)
      return b->Eof()?UNPACK_PREMATURE_EOF:UNPACK_NO_DATA_YET;
   length=b->UnpackUINT32BE(0);
   unpacked+=4;
   if(length==0) {
      type=MSG_KEEPALIVE;
      return UNPACK_SUCCESS;
   }
   if(length<0 || length>1024*1024) {
      LogError(4,"invalid length %d",length);
      return UNPACK_WRONG_FORMAT;
   }
   if(b->Size()<length+4)
      return b->Eof()?UNPACK_PREMATURE_EOF:UNPACK_NO_DATA_YET;
   int t=b->UnpackUINT8(4);
   unpacked++;
   if(!is_valid_reply(t)) {
      LogError(4,"unknown packet type %d, length %d",t,length);
      return UNPACK_WRONG_FORMAT;
   }
   type=(packet_type)t;
   return UNPACK_SUCCESS;
}

bool TorrentPeer::AddressEq(const TorrentPeer *o) const
{
   return !memcmp(&addr,&o->addr,sizeof(addr));
}

const char *TorrentPeer::GetName() const
{
   xstring& name=xstring::format("[%s]:%d",addr.address(),addr.port());
   if(tracker_no==TR_ACCEPTED)
      name.append("/A");
   else if(tracker_no==TR_DHT)
      name.append("/D");
   else if(tracker_no==TR_PEX)
      name.append("/X");
   else if(parent->trackers.count()>1)
      name.appendf("/%d",tracker_no+1);
   return name;
}

const char *TorrentPeer::Status()
{
   if(sock==-1) {
      if(last_dc)
	 return xstring::format("Disconnected (%s)",last_dc.get());
      return _("Not connected");
   }
   if(!connected)
      return _("Connecting...");
   if(!peer_id)
      return _("Handshaking...");
   xstring &buf=xstring::format("dn:%s %sup:%s %s",
      xhuman(peer_recv),peer_recv_rate.GetStrS(),
      xhuman(peer_sent),peer_send_rate.GetStrS());
   if(peer_interested)
      buf.append("peer-interested ");
   if(peer_choking)
      buf.append("peer-choking ");
   if(am_interested)
      buf.append("am-interested ");
   if(am_choking)
      buf.append("am-choking ");
   if(parent->HasMetadata()) {
      if(peer_complete_pieces<parent->total_pieces)
	 buf.appendf("complete:%u/%u (%u%%)",peer_complete_pieces,parent->total_pieces,
	    peer_complete_pieces*100/parent->total_pieces);
      else
	 buf.append("complete");
   }
   return buf;
}

TorrentPeer::Packet::Packet(packet_type t)
{
   type=t;
   length=0;
   if(type>=0)
      length+=1;
}
void TorrentPeer::Packet::Pack(SMTaskRef<IOBuffer>& b)
{
   b->PackUINT32BE(length);
   if(type>=0)
      b->PackUINT8(type);
}

TorrentPeer::PacketBitField::PacketBitField(const BitField *bf)
   : Packet(MSG_BITFIELD)
{
   bitfield=new BitField();
   bitfield->set(*bf);
   length+=bitfield->count();
}
TorrentPeer::PacketBitField::~PacketBitField()
{
}
TorrentPeer::unpack_status_t TorrentPeer::PacketBitField::Unpack(const Buffer *b)
{
   unpack_status_t res;
   res=Packet::Unpack(b);
   if(res!=UNPACK_SUCCESS)
      return res;
   int bytes=length+4-unpacked;
   bitfield=new BitField(bytes*8);
   memcpy(bitfield->get_non_const(),b->Get()+unpacked,bytes);
   unpacked+=bytes;
   return UNPACK_SUCCESS;
}
void TorrentPeer::PacketBitField::ComputeLength()
{
   Packet::ComputeLength();
   length+=bitfield->count();
}
void TorrentPeer::PacketBitField::Pack(SMTaskRef<IOBuffer>& b)
{
   Packet::Pack(b);
   b->Put((const char*)(bitfield->get()),bitfield->count());
}

TorrentPeer::_PacketIBL::_PacketIBL(packet_type t,unsigned i,unsigned b,unsigned l)
   : Packet(t), index(i), begin(b), req_length(l)
{
   length+=12;
}
TorrentPeer::unpack_status_t TorrentPeer::_PacketIBL::Unpack(const Buffer *b)
{
   unpack_status_t res;
   res=Packet::Unpack(b);
   if(res!=UNPACK_SUCCESS)
      return res;
   index=b->UnpackUINT32BE(unpacked);unpacked+=4;
   begin=b->UnpackUINT32BE(unpacked);unpacked+=4;
   req_length=b->UnpackUINT32BE(unpacked);unpacked+=4;
   return UNPACK_SUCCESS;
}
void TorrentPeer::_PacketIBL::ComputeLength()
{
   Packet::ComputeLength();
   length+=12;
}
void TorrentPeer::_PacketIBL::Pack(SMTaskRef<IOBuffer>& b)
{
   Packet::Pack(b);
   b->PackUINT32BE(index);
   b->PackUINT32BE(begin);
   b->PackUINT32BE(req_length);
}
TorrentPeer::unpack_status_t TorrentPeer::Packet::UnpackBencoded(const Buffer *b,int *offset,int limit,Ref<BeNode> *out)
{
   assert(limit<=b->Size());
   int rest=limit-*offset;
   int rest0=rest;
   *out=BeNode::Parse(b->Get()+*offset,rest,&rest);
   if(!*out) {
      if(rest>0)
	 return UNPACK_WRONG_FORMAT;
      return b->Eof()?UNPACK_PREMATURE_EOF:UNPACK_NO_DATA_YET;
   }
   *offset+=(rest0-rest);
   return UNPACK_SUCCESS;
}


BitField::BitField(int bits) {
   bit_length=bits;
   int bytes=(bits+7)/8;
   get_space(bytes);
   memset(buf,0,bytes);
   set_length(bytes);
}
bool BitField::get_bit(int i) const {
   return (*this)[i/8]&(0x80>>(i%8));
}
void BitField::set_bit(int i,bool value) {
   unsigned char &b=(*this)[i/8];
   int v=(0x80>>(i%8));
   if(value)
      b|=v;
   else
      b&=~v;
}
bool BitField::has_any_set(int from,int to) const {
   for(int i=from; i<to; i++)
      if(get_bit(i))
	 return true;
   return false;
}
bool BitField::has_all_set(int from,int to) const {
   for(int i=from; i<to; i++)
      if(!get_bit(i))
	 return false;
   return true;
}
void BitField::set_range(int from,int to,bool value) {
   for(int i=from; i<to; i++)
      set_bit(i,value);
}

void TorrentBlackList::check_expire()
{
   for(Timer *e=bl.each_begin(); e; e=bl.each_next()) {
      if(e->Stopped()) {
	 LogNote(4,"black-delisting peer %s\n",bl.each_key().get());
	 bl.remove(bl.each_key());
      }
   }
}
void TorrentBlackList::Add(const sockaddr_u &a,const char *t)
{
   check_expire();
   if(Listed(a))
      return;
   LogNote(4,"black-listing peer %s (%s)\n",(const char*)a,t);
   bl.add(a.to_xstring(),new Timer(TimeIntervalR(t)));
}
bool TorrentBlackList::Listed(const sockaddr_u &a)
{
   return bl.lookup(a.to_xstring())!=0;
}


TorrentListener::TorrentListener(int a,int t)
   : af(a), type(t), sock(-1), last_sent_udp_count(0)
{
}
TorrentListener::~TorrentListener()
{
   if(sock!=-1)
      close(sock);
}
void TorrentListener::FillAddress(int port)
{
   addr.set_defaults(af,"torrent",port);
}

bool Torrent::NoTorrentCanAccept()
{
   for(const Torrent *t=torrents.each_begin(); t; t=torrents.each_next()) {
      if(t->CanAccept())
	 return false;
   }
   return true;
}

int TorrentListener::Do()
{
   int m=STALL;
   if(error)
      return m;
   if(sock==-1) {
      int proto=(type==SOCK_STREAM?IPPROTO_TCP:IPPROTO_UDP);
      sock=SocketCreateUnbound(af,type,proto,0);
      if(sock==-1) {
	 if(NonFatalError(errno))
	    return m;
	 error=Error::Fatal(_("cannot create socket of address family %d"),addr.sa.sa_family);
	 return MOVED;
      }
      SocketSinglePF(sock,af);

      // Try to assign a port from given range
      Range range(ResMgr::Query("torrent:port-range",0));

      // but first try already allocated port
      int prefer_port=Torrent::GetPort();
      if(prefer_port) {
	 ReuseAddress(sock);   // try to reuse address.
	 FillAddress(prefer_port);
	 if(addr.bind_to(sock)==0)
	    goto bound;
	 LogError(1,"bind(%s): %s",addr.to_string(),strerror(errno));
      }

      for(int t=0; ; t++)
      {
	 if(t>=10)
	 {
	    close(sock);
	    sock=-1;
	    TimeoutS(1);	 // retry later.
	    return m;
	 }
	 if(t==9)
	    ReuseAddress(sock);   // try to reuse address.

	 int port=0;
	 if(!range.IsFull())
	    port=range.Random();
	 if(!port && type==SOCK_DGRAM)
	    port=Range("1024-65535").Random();

	 if(!port)
	     break;	// nothing to bind

	 FillAddress(port);
	 if(addr.bind_to(sock)==0)
	    break;
	 int saved_errno=errno;

	 // Fail unless socket was already taken
	 if(errno!=EINVAL && errno!=EADDRINUSE)
	 {
	    LogError(0,"bind(%s): %s",addr.to_string(),strerror(saved_errno));
	    close(sock);
	    sock=-1;
	    if(NonFatalError(errno))
	    {
	       TimeoutS(1);
	       return m;
	    }
	    error=Error::Fatal(_("Cannot bind a socket for torrent:port-range"));
	    return MOVED;
	 }
	 LogError(10,"bind(%s): %s",addr.to_string(),strerror(saved_errno));
      }
   bound:
      if(type==SOCK_STREAM)
	 listen(sock,5);

      // get the allocated port
      socklen_t addr_len=sizeof(addr);
      getsockname(sock,&addr.sa,&addr_len);
      LogNote(4,"listening on %s %s",type==SOCK_STREAM?"tcp":"udp",addr.to_string());
      m=MOVED;

      if(type==SOCK_DGRAM && Torrent::dht)
	 Torrent::GetDHT(af)->Load();
   }

   if(type==SOCK_DGRAM) {
      if(!Ready(sock,POLLIN)) {
	 Block(sock,POLLIN);
	 return m;
      }
      char buf[0x4000];
      sockaddr_u src;
      socklen_t src_len=sizeof(src);
      int res=recvfrom(sock,buf,sizeof(buf),0,&src.sa,&src_len);
      if(res==-1) {
	 if(!E_RETRY(errno))
	    LogError(9,"recvfrom: %s",strerror(errno));
	 Block(sock,POLLIN);
	 return m;
      }
      if(res==0)
	 return MOVED;
      rate.Add(1);
      Torrent::DispatchUDP(buf,res,src);
      return MOVED;
   }

   if(rate.Get()>5 || Torrent::NoTorrentCanAccept())
   {
      TimeoutS(1);
      return m;
   }

   if(!Ready(sock,POLLIN)) {
      Block(sock,POLLIN);
      return m;
   }

   sockaddr_u remote_addr;
   int a=SocketAccept(sock,&remote_addr);
   if(a==-1) {
      Block(sock,POLLIN);
      return m;
   }
   rate.Add(1);
   LogNote(3,_("Accepted connection from [%s]:%d"),remote_addr.address(),remote_addr.port());
   (void)new TorrentDispatcher(a,&remote_addr);
   m=MOVED;

   return m;
}
bool TorrentListener::MaySendUDP()
{
   // limit udp rate
   if(last_sent_udp_count>=10 && now==last_sent_udp)
      UpdateNow();
   TimeDiff time_passed(now,last_sent_udp);
   if(time_passed.MilliSeconds()<1) {
      if(last_sent_udp_count>=10) {
	 Timeout(1);
	 return false;
      }
      last_sent_udp_count++;
   } else {
      last_sent_udp_count=0;
      last_sent_udp=now;
   }
   // check if output buffer is available
   struct pollfd pfd;
   pfd.fd=sock;
   pfd.events=POLLOUT;
   pfd.revents=0;
   int res=poll(&pfd,1,0);
   if(res>0)
      return true;
   Block(sock,POLLOUT);
   return false;
}
int TorrentListener::SendUDP(const sockaddr_u& a,const xstring& buf)
{
   int res=sendto(sock,buf,buf.length(),0,&a.sa,a.addr_len());
   if(res==-1)
      LogError(0,"sendto(%s): %s",a.to_string(),strerror(errno));
   return res;
}

void Torrent::DispatchUDP(const char *buf,int len,const sockaddr_u& src)
{
   int rest;
   if(buf[0]=='d' && buf[len-1]=='e' && dht) {
      Ref<BeNode> msg(BeNode::Parse(buf,len,&rest));
      if(!msg)
	 goto unknown;
      const SMTaskRef<DHT> &d=Torrent::GetDHT(src);
      d->Enter();
      d->HandlePacket(msg.get_non_const(),src);
      d->Leave();
   } else if(buf[0]==0x41) {
      LogRecv(9,xstring::format("uTP SYN v1 from %s {%s}",src.to_string(),xstring::get_tmp(buf,len).hexdump()));
   } else {
   unknown:
      LogRecv(4,xstring::format("udp from %s {%s}",src.to_string(),xstring::get_tmp(buf,len).hexdump()));
   }
}

void Torrent::Dispatch(const xstring& info_hash,int sock,const sockaddr_u *remote_addr,IOBuffer *recv_buf)
{
   Torrent *t=FindTorrent(info_hash);
   if(!t) {
      LogError(3,_("peer sent unknown info_hash=%s in handshake"),info_hash.hexdump());
      close(sock);
      Delete(recv_buf);
      return;
   }
   t->Accept(sock,remote_addr,recv_buf);
}

TorrentDispatcher::TorrentDispatcher(int s,const sockaddr_u *a)
   : sock(s), addr(*a),
     recv_buf(new IOBufferFDStream(new FDStream(sock,"<input-socket>"),IOBuffer::GET)),
     timeout_timer(60),
     peer_name(addr.to_xstring())
{
}
TorrentDispatcher::~TorrentDispatcher()
{
   if(sock!=-1)
      close(sock);
}
int TorrentDispatcher::Do()
{
   if(timeout_timer.Stopped())
   {
      LogError(1,_("peer handshake timeout"));
      Delete(this);
      return MOVED;
   }

   unsigned proto_len=0;
   if(recv_buf->Size()>0)
      proto_len=recv_buf->UnpackUINT8();

   if((unsigned)recv_buf->Size()<1+proto_len+8+SHA1_DIGEST_SIZE) {
      if(recv_buf->Eof()) {
	 if(recv_buf->Size()>0)
	    LogError(1,_("peer short handshake"));
	 else
	    LogError(4,_("peer closed just accepted connection"));
	 Delete(this);
	 return MOVED;
      }
      return STALL;
   }

   int unpacked=1;
   const char *data=recv_buf->Get();

   unpacked+=proto_len;
   unpacked+=8; // 8 bytes are reserved

   xstring peer_info_hash(data+unpacked,SHA1_DIGEST_SIZE);
   unpacked+=SHA1_DIGEST_SIZE;

   Torrent::Dispatch(peer_info_hash,sock,&addr,recv_buf.borrow());
   sock=-1;
   Delete(this);
   return MOVED;
}

///
TorrentJob::TorrentJob(Torrent *t)
   : torrent(t), completed(false), done(false)
{
}
TorrentJob::~TorrentJob()
{
}
void TorrentJob::PrepareToDie()
{
   done=true;
   torrent=0;
   Job::PrepareToDie();
}

int TorrentJob::Do()
{
   if(done)
      return STALL;
   if(torrent->Done()) {
      done=true;
      const Error *e=torrent->GetInvalidCause();
      if(e)
	 eprintf("%s\n",e->Text());
      return MOVED;
   }
   if(!completed && torrent->Complete()) {
      if(parent->WaitsFor(this) && !torrent->IsSharing()) {
	 PrintStatus(1,"");
	 printf(_("Seeding in background...\n"));
	 parent->RemoveWaiting(this);
      }
      completed=true;
      return MOVED;
   }
   return STALL;
}

xstring& TorrentJob::FormatStatus(xstring& s,int v,const char *tab)
{
   if(torrent->IsDownloading())
      torrent->CalcPiecesStats();
   const char *name=torrent->GetName();
   if(name)
      s.appendf("%sName: %s\n",tab,name);
   const char *status=torrent->Status();
   if(*status)
      s.appendf("%s%s\n",tab,status);
   if(torrent->IsDownloading()) {
      s.appendf("%spiece availability: min %u, avg %.2f, %d%% available\n",tab,
	 torrent->MinPieceSources(),torrent->AvgPieceSources(),torrent->PiecesAvailablePct());
      if(torrent->GetRatio()>0) {
	 s.appendf("%sratio: %.2f/%.2f/%.2f\n",tab,
	    torrent->GetMinPerPieceRatio(),torrent->GetRatio(),
	    torrent->GetMaxPerPieceRatio());
      }
   }

   if(v>2) {
      s.appendf("%sinfo hash: %s\n",tab,torrent->GetInfoHash().hexdump());
      if(torrent->HasMetadata()) {
	 s.appendf("%stotal length: %llu\n",tab,torrent->TotalLength());
	 s.appendf("%spiece length: %u\n",tab,torrent->PieceLength());
      }
   }

   if(v>1) {
      if(torrent->Trackers().count()==1) {
	 s.appendf("%stracker: %s - %s\n",tab,torrent->Trackers()[0]->GetURL(),
	       torrent->Trackers()[0]->Status());
      } else if(torrent->Trackers().count()>1) {
	 s.appendf("%strackers:\n",tab);
	 for(int i=0; i<torrent->Trackers().count(); i++) {
	    s.appendf("%s%2d. %s - %s\n",tab,i+1,torrent->Trackers()[i]->GetURL(),
		  torrent->Trackers()[i]->Status());
	 }
      }
      const char *dht_status=torrent->DHT_Status();
      if(*dht_status)
	 s.appendf("%sDHT: %s\n",tab,dht_status);
   }

   if(torrent->ShuttingDown())
      return s;

   if(torrent->GetPeersCount()<=5 || v>1) {
      const TaskRefArray<TorrentPeer>& peers=torrent->GetPeers();
      int not_connected_peers=torrent->GetPeersCount()-torrent->GetConnectedPeersCount();
      if(v<=2 && not_connected_peers>0)
	 s.appendf("%s  not connected peers: %d\n",tab,not_connected_peers);
      for(int i=0; i<peers.count(); i++) {
	 if(peers[i]->Connected() || v>2)
	    s.appendf("%s  %s: %s\n",tab,peers[i]->GetName(),peers[i]->Status());
      }
   } else {
      s.appendf("%s  peers:%d connected:%d active:%d complete:%d\n",tab,
	 torrent->GetPeersCount(),torrent->GetConnectedPeersCount(),
	 torrent->GetActivePeersCount(),torrent->GetCompletePeersCount());
   }
   return s;
}

void TorrentJob::ShowRunStatus(const SMTaskRef<StatusLine>& s)
{
   const xstring& status=torrent->Status();
   const char *name=torrent->GetName();
   int w=s->GetWidthDelayed()-status.length()-3;
   if(w<8)  w=8;
   if(w>40) w=40;
   s->Show("%s: %s",squeeze_file_name(name,w),status.get());
}

int TorrentJob::AcceptSig(int sig)
{
   if(!torrent || torrent->ShuttingDown())
      return WANTDIE;
   torrent->Shutdown();
   return MOVED;
}


#include "CmdExec.h"
CDECL_BEGIN
#include <glob.h>
CDECL_END

CMD(torrent)
{
   Torrent::ClassInit();

#define args (parent->args)
#define eprintf parent->eprintf
   enum {
      OPT_OUTPUT_DIRECTORY,
      OPT_FORCE_VALID,
      OPT_DHT_BOOTSTRAP,
      OPT_SHARE,
      OPT_ONLY_NEW,
      OPT_ONLY_INCOMPLETE,
   };
   static const struct option torrent_opts[]=
   {
      {"output-directory",required_argument,0,OPT_OUTPUT_DIRECTORY},
      {"force-valid",no_argument,0,OPT_FORCE_VALID},
      {"dht-bootstrap",required_argument,0,OPT_DHT_BOOTSTRAP},
      {"share",no_argument,0,OPT_SHARE},
      {"only-new",no_argument,0,OPT_ONLY_NEW},
      {"only-incomplete",no_argument,0,OPT_ONLY_INCOMPLETE},
      {0}
   };
   const char *output_dir=0;
   const char *dht_bootstrap=0;
   bool force_valid=false;
   bool share=false;
   bool only_new=false;
   bool only_incomplete=false;

   args->rewind();
   int opt;
   while((opt=args->getopt_long("O:",torrent_opts,0))!=EOF)
   {
      switch(opt)
      {
      case(OPT_OUTPUT_DIRECTORY):
      case('O'):
	 output_dir=optarg;
	 break;
      case(OPT_FORCE_VALID):
	 force_valid=true;
	 break;
      case(OPT_DHT_BOOTSTRAP):
	 dht_bootstrap=optarg;
	 Torrent::BootstrapDHT(dht_bootstrap);
	 break;
      case(OPT_SHARE):
	 share=true;
	 break;
      case(OPT_ONLY_NEW):
	 only_new=true;
	 // fallthrough
      case(OPT_ONLY_INCOMPLETE):
	 only_incomplete=true;
	 break;
      case('?'):
      try_help:
	 eprintf(_("Try `help %s' for more information.\n"),args->a0());
	 return 0;
      }
   }
   args->back();

   if(share && output_dir) {
      eprintf(_("%s: --share conflicts with --output-directory.\n"),args->a0());
      return 0;
   }
   if(share && only_new) {
      eprintf(_("%s: --share conflicts with --only-new.\n"),args->a0());
      return 0;
   }
   if(share && only_incomplete) {
      eprintf(_("%s: --share conflicts with --only-incomplete.\n"),args->a0());
      return 0;
   }

   xstring_ca torrent_opt(args->Combine(0,args->getindex()+1));

   xstring_ca cwd(xgetcwd());
   if(output_dir) {
      output_dir=dir_file(cwd,expand_home_relative(output_dir));
      output_dir=alloca_strdup(output_dir);
   } else
      output_dir=cwd;

   Ref<ArgV> args_g(new ArgV(args->a0()));
   const char *torrent;
   while((torrent=args->getnext())!=0) {
      int globbed=0;
      if(share || !url::is_url(torrent)) {
	 glob_t pglob;
	 glob(expand_home_relative(torrent),0,0,&pglob);
	 if(pglob.gl_pathc>0) {
	    for(unsigned i=0; i<pglob.gl_pathc; i++) {
	       const char *f=pglob.gl_pathv[i];
	       struct stat st;
	       if(share || (stat(f,&st)!=-1 && S_ISREG(st.st_mode))) {
		  args_g->Add(dir_file(cwd,f));
		  globbed++;
	       }
	    }
	 }
	 globfree(&pglob);
      }
      if(!globbed)
	 args_g->Add(torrent);
   }

   torrent=args_g->getnext();
   if(!torrent)
   {
      if(dht_bootstrap)
	 return 0;
      if(share)
	 eprintf(_("%s: Please specify a file or directory to share.\n"),args->a0());
      else
	 eprintf(_("%s: Please specify meta-info file or URL.\n"),args->a0());
      goto try_help;
   }
   while(torrent) {
      Torrent *t=new Torrent(torrent,cwd,output_dir);
      if(force_valid)
	 t->ForceValid();
      if(share)
	 t->Share();
      if(only_new)
	 t->StopIfKnown();
      if(only_incomplete)
	 t->StopIfComplete();
      TorrentJob *tj=new TorrentJob(t);
      tj->cmdline.set(xstring::cat(torrent_opt," ",torrent,NULL));
      parent->AddNewJob(tj);
      torrent=args_g->getnext();
   }
   return 0;
#undef args
}

#include "modconfig.h"
#ifndef MODULE_CMD_TORRENT
# define module_init cmd_torrent_module_init
#endif
CDECL void module_init()
{
   Torrent::ClassInit();
   CmdExec::RegisterCommand("torrent",cmd_torrent,0,
	 N_("Start BitTorrent job for the given torrent-files, which can be a local file,\n"
	 "URL, magnet link or plain info_hash written in hex or base32. Local wildcards\n"
	 "are expanded. Options:\n"
	 " -O <base>      specifies base directory where files should be placed\n"
	 " --force-valid  skip file validation\n"
	 " --dht-bootstrap=<node>  bootstrap DHT by sending a query to the node\n"
	 " --share        share specified file or directory\n"));
}