/* * lftp - file transfer program * * Copyright (c) 1996-2016 by Alexander V. Lukyanov (lav@yars.free.net) * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 3 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program. If not, see . */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include "Torrent.h" #include "TorrentTracker.h" #include "DHT.h" #include "log.h" #include "url.h" #include "misc.h" #include "plural.h" CDECL_BEGIN #include "human.h" CDECL_END static ResType torrent_vars[] = { {"torrent:port-range", "6881-6889", ResMgr::RangeValidate, ResMgr::NoClosure}, {"torrent:max-peers", "60", ResMgr::UNumberValidate}, {"torrent:save-metadata", "yes", ResMgr::BoolValidate, ResMgr::NoClosure}, {"torrent:stop-min-ppr", "1.4", ResMgr::FloatValidate}, {"torrent:stop-on-ratio", "2.0", ResMgr::FloatValidate}, {"torrent:seed-max-time", "30d", ResMgr::TimeIntervalValidate}, {"torrent:seed-min-peers", "3", ResMgr::UNumberValidate}, {"torrent:ip", "", ResMgr::IPv4AddrValidate, ResMgr::NoClosure}, {"torrent:retracker", ""}, {"torrent:use-dht", "yes", ResMgr::BoolValidate, ResMgr::NoClosure}, {"torrent:timeout", "7d", ResMgr::TimeIntervalValidate, ResMgr::NoClosure}, #if INET6 {"torrent:ipv6", "", ResMgr::IPv6AddrValidate, ResMgr::NoClosure}, #endif {0} }; static ResDecls torrent_vars_register(torrent_vars); #if INET6 #ifdef HAVE_IFADDRS_H #include #endif static const char *FindGlobalIPv6Address() { #ifdef HAVE_IFADDRS_H struct ifaddrs *ifaddrs=0; getifaddrs(&ifaddrs); for(struct ifaddrs *ifa=ifaddrs; ifa; ifa=ifa->ifa_next) { if(ifa->ifa_addr && ifa->ifa_addr->sa_family==AF_INET6) { struct in6_addr *addr=&((struct sockaddr_in6*)ifa->ifa_addr)->sin6_addr; if(!IN6_IS_ADDR_UNSPECIFIED(addr) && !IN6_IS_ADDR_LOOPBACK(addr) && !IN6_IS_ADDR_LINKLOCAL(addr) && !IN6_IS_ADDR_SITELOCAL(addr) && !IN6_IS_ADDR_MULTICAST(addr)) { char *buf=xstring::tmp_buf(INET6_ADDRSTRLEN); inet_ntop(AF_INET6, addr, buf, INET6_ADDRSTRLEN); freeifaddrs(ifaddrs); return buf; } } } freeifaddrs(ifaddrs); #endif return 0; } #endif void Torrent::ClassInit() { static bool inited; if(inited) return; inited=true; #if INET6 const char *ipv6=ResMgr::Query("torrent:ipv6",0); if(!*ipv6) { ipv6=FindGlobalIPv6Address(); if(ipv6) { ProtoLog::LogNote(9,"found IPv6 address: %s",ipv6); ResMgr::Set("torrent:ipv6",0,ipv6); } } #endif } xstring Torrent::my_peer_id; xstring Torrent::my_key; unsigned Torrent::my_key_num; xmap Torrent::torrents; SMTaskRef Torrent::listener; SMTaskRef Torrent::listener_udp; SMTaskRef Torrent::dht; #if INET6 SMTaskRef Torrent::listener_ipv6; SMTaskRef Torrent::listener_ipv6_udp; SMTaskRef Torrent::dht_ipv6; #endif SMTaskRef Torrent::fd_cache; Ref Torrent::black_list; void Torrent::StartDHT() { if(!ResMgr::QueryBool("torrent:use-dht",0)) { StopDHT(); StopListenerUDP(); return; } if(dht) return; StartListenerUDP(); const char *home=get_lftp_cache_dir(); const char *nodename=get_nodename(); mkdir(xstring::format("%s/DHT",home),0700); const char *ip=ResMgr::Query("torrent:ip",0); if(!ip || !ip[0]) ip="127.0.0.1"; sockaddr_compact ip_packed; ip_packed.get_space(4); inet_pton(AF_INET,ip,ip_packed.get_non_const()); ip_packed.set_length(4); xstring node_id; DHT::MakeNodeId(node_id,ip_packed); dht=new DHT(AF_INET,node_id); dht->state_file.setf("%s/DHT/ipv4-%s",home,nodename); if(listener_udp->GetPort()) dht->Load(); #if INET6 ip=ResMgr::Query("torrent:ipv6",0); if(!ip || !ip[0]) ip="::1"; ip_packed.get_space(16); inet_pton(AF_INET6,ip,ip_packed.get_non_const()); ip_packed.set_length(16); DHT::MakeNodeId(node_id,ip_packed); dht_ipv6=new DHT(AF_INET6,node_id); dht_ipv6->state_file.setf("%s/DHT/ipv6-%s",home,nodename); if(listener_ipv6_udp->GetPort()) dht_ipv6->Load(); #endif } void Torrent::StopDHT() { if(!dht) return; dht->Save(); dht=0; #if INET6 dht_ipv6->Save(); dht_ipv6=0; #endif } void Torrent::StartListener() { if(listener) return; listener=new TorrentListener(AF_INET); listener->Roll(); // try to allocate ipv4 port first #if INET6 listener_ipv6=new TorrentListener(AF_INET6); #endif } void Torrent::StopListener() { listener=0; #if INET6 listener_ipv6=0; #endif } void Torrent::StartListenerUDP() { if(listener_udp) return; listener_udp=new TorrentListener(AF_INET,SOCK_DGRAM); #if INET6 listener_ipv6_udp=new TorrentListener(AF_INET6,SOCK_DGRAM); #endif } void Torrent::StopListenerUDP() { listener_udp=0; #if INET6 listener_ipv6_udp=0; #endif } Torrent::Torrent(const char *mf,const char *c,const char *od) : metainfo_url(mf), pieces_timer(10), cwd(c), output_dir(od), rate_limit(mf), seed_timer("torrent:seed-max-time",0), timeout_timer("torrent:timeout",0), optimistic_unchoke_timer(30), peers_scan_timer(1), am_interested_timer(1), shutting_down_timer(60), dht_announce_timer(10*60), dht_announce_count(0), dht_announce_count_ipv6(0) { shutting_down=false; complete=false; end_game=false; is_private=false; validating=false; force_valid=false; build_md=false; stop_if_complete=false; stop_if_known=false; md_saved=false; validate_index=0; metadata_size=0; info=0; pieces=0; piece_length=0; total_pieces=0; last_piece_length=0; total_length=0; total_sent=0; total_recv=0; total_left=0; complete_pieces=0; connected_peers_count=0; active_peers_count=0; complete_peers_count=0; am_interested_peers_count=0; am_not_choking_peers_count=0; max_peers=60; seed_min_peers=3; stop_on_ratio=2; stop_min_ppr=1, last_piece=TorrentPeer::NO_PIECE; min_piece_sources=0; avg_piece_sources=0; pieces_available_pct=0; current_min_ppr=0; current_max_ppr=0; Reconfig(0); if(!my_peer_id) { my_peer_id.set("-lftp47-"); my_peer_id.appendf("%04x",(unsigned)getpid() & 0xffff); my_peer_id.appendf("%08x",(unsigned)now.UnixTime()); assert(my_peer_id.length()==PEER_ID_LEN); } if(!my_key) { for(int i=0; i<10; i++) my_key.appendf("%02x",unsigned(random()/13%256)); my_key_num=random(); } dht_announce_timer.Stop(); } Torrent::~Torrent() { } bool Torrent::TrackersDone() const { if(shutting_down && shutting_down_timer.Stopped()) return true; for(int i=0; iIsActive()) return false; } return true; } int Torrent::Done() const { return (shutting_down && TrackersDone()); } void Torrent::ShutdownTrackers() const { for(int i=0; iShutdown(); } } void Torrent::Shutdown() { if(shutting_down) return; Enter(this); LogNote(3,"Shutting down..."); shutting_down=true; shutting_down_timer.Reset(); ShutdownTrackers(); DenounceDHT(); PrepareToDie(); Leave(); } void Torrent::AddTorrent(Torrent *t) { if(FindTorrent(t->GetInfoHash())) return; if(GetTorrentsCount()==0) { StartListener(); StartDHT(); } torrents.add(t->GetInfoHash(),t); } void Torrent::RemoveTorrent(Torrent *t) { if(t!=FindTorrent(t->info_hash)) return; torrents.remove(t->GetInfoHash()); if(GetTorrentsCount()==0) { StopListener(); StopDHT(); StopListenerUDP(); fd_cache=0; black_list=0; } } void Torrent::PrepareToDie() { metainfo_copy=0; building=0; peers.unset(); if(info_hash && this==FindTorrent(info_hash)) RemoveTorrent(this); } void Torrent::SetError(Error *e) { if(invalid_cause) return; invalid_cause=e; LogError(0,"%s: %s", invalid_cause->IsFatal()?"Fatal error":"Transient error", invalid_cause->Text()); Shutdown(); } void Torrent::SetError(const char *msg) { SetError(Error::Fatal(msg)); } double Torrent::GetRatio() const { if(total_sent==0 || total_length==total_left) return 0; return total_sent/double(total_length-total_left); } void Torrent::SetDownloader(unsigned piece,unsigned block,const TorrentPeer *o,const TorrentPeer *n) { piece_info[piece].set_downloader(block,o,n,BlocksInPiece(piece)); } void Torrent::AccountSend(unsigned p,unsigned len) { total_sent+=len; send_rate.Add(len); piece_info[p].add_ratio(float(len)/PieceLength(p)); } void Torrent::AccountRecv(unsigned p,unsigned len) { total_recv+=len; recv_rate.Add(len); } BeNode *Torrent::Lookup(xmap_p& dict,const char *name,BeNode::be_type_t type) { BeNode *node=dict.lookup(name); if(!node) { SetError(xstring::format("Meta-data: `%s' key missing",name)); return 0; } if(node->type!=type) { SetError(xstring::format("Meta-data: wrong `%s' type, must be %s",name,BeNode::TypeName(type))); return 0; } return node; } void Torrent::InitTranslation() { const char *charset="UTF-8"; // default recv_translate_utf8=new DirectedBuffer(DirectedBuffer::GET); recv_translate_utf8->SetTranslation(charset,true); if(metainfo_tree) { BeNode *b_charset=metainfo_tree->lookup("encoding",BeNode::BE_STR); if(b_charset) charset=b_charset->str; } recv_translate=new DirectedBuffer(DirectedBuffer::GET); recv_translate->SetTranslation(charset,true); } void Torrent::TranslateString(BeNode *node) const { if(node->str_lc) return; recv_translate->ResetTranslation(); recv_translate->PutTranslated(node->str); node->str_lc.nset(recv_translate->Get(),recv_translate->Size()); recv_translate->Skip(recv_translate->Size()); } void Torrent::TranslateStringFromUTF8(BeNode *node) const { if(node->str_lc) return; const Ref& tr=recv_translate_utf8; tr->ResetTranslation(); tr->PutTranslated(node->str); node->str_lc.nset(tr->Get(),tr->Size()); tr->Skip(tr->Size()); } void Torrent::SHA1(const xstring& str,xstring& buf) { buf.get_space(SHA1_DIGEST_SIZE); sha1_buffer(str.get(),str.length(),buf.get_non_const()); buf.set_length(SHA1_DIGEST_SIZE); } void Torrent::ValidatePiece(unsigned p) { const xstring& buf=Torrent::RetrieveBlock(p,0,PieceLength(p)); bool valid=false; if(buf.length()==PieceLength(p)) { xstring& sha1=xstring::get_tmp(); SHA1(buf,sha1); if(building) { building->SetPiece(p,sha1); valid=true; } else { valid=!memcmp(pieces->get()+p*SHA1_DIGEST_SIZE,sha1,SHA1_DIGEST_SIZE); } } if(!valid) { if(building) { SetError("File validation error"); return; } if(buf.length()==PieceLength(p)) LogError(11,"piece %u digest mismatch",p); if(my_bitfield->get_bit(p)) { total_left+=PieceLength(p); complete_pieces--; my_bitfield->set_bit(p,0); } SetBlocksAbsent(p); } else { LogNote(11,"piece %u ok",p); if(!my_bitfield->get_bit(p)) { total_left-=PieceLength(p); complete_pieces++; my_bitfield->set_bit(p,1); piece_info[p].free_block_map(); } } } template static inline int cmp(T a,T b) { if(a>b) return 1; if(apiece_info[*a].get_sources_count(); int rb=cmp_torrent->piece_info[*b].get_sources_count(); int c=cmp(ra,rb); if(c) return c; return cmp(*a,*b); } int Torrent::PeersCompareActivity(const SMTaskRef *p1,const SMTaskRef *p2) { TimeDiff i1((*p1)->activity_timer.TimePassed()); TimeDiff i2((*p2)->activity_timer.TimePassed()); return cmp(i1.Seconds(),i2.Seconds()); } int Torrent::PeersCompareRecvRate(const SMTaskRef *p1,const SMTaskRef *p2) { float r1((*p1)->peer_recv_rate.Get()); float r2((*p2)->peer_recv_rate.Get()); int c=cmp(r1,r2); if(c) return c; return PeersCompareSendRate(p1,p2); } int Torrent::PeersCompareSendRate(const SMTaskRef *p1,const SMTaskRef *p2) { float r1((*p1)->peer_send_rate.Get()); float r2((*p2)->peer_send_rate.Get()); return cmp(r1,r2); } bool Torrent::SeededEnough() const { return (stop_on_ratio>0 && GetRatio()>=stop_on_ratio && GetMinPerPieceRatio()>=stop_min_ppr) || seed_timer.Stopped(); } void Torrent::CleanPeers() { Enter(); // remove uninteresting peers and request more for(int i=0; iActivityTimedOut()) { LogNote(4,"removing uninteresting peer %s (%s)",peer->GetName(),peers[i]->Status()); BlackListPeer(peer,"2h"); peers.remove(i--); } } Leave(); } void Torrent::StartTrackers() { for(int i=0; iStart(); } } int Torrent::GetPort() { int port=0; if(listener && !port) port=listener->GetPort(); #if INET6 if(listener_ipv6 && !port) port=listener_ipv6->GetPort(); #endif if(listener_udp && !port) return listener_udp->GetPort(); #if INET6 if(listener_ipv6_udp && !port) port=listener_ipv6_udp->GetPort(); #endif return port; } void Torrent::AnnounceDHT() { if(is_private) return; CleanPeers(); if(dht) dht->AnnouncePeer(this); #if INET6 if(dht_ipv6) dht_ipv6->AnnouncePeer(this); #endif dht_announce_timer.Reset(); } void Torrent::DenounceDHT() { if(is_private) return; if(dht) dht->DenouncePeer(this); #if INET6 if(dht_ipv6) dht_ipv6->DenouncePeer(this); #endif } void Torrent::DHT_Announced(int af) { if(af==AF_INET) dht_announce_count++; #if INET6 else if(af==AF_INET6) dht_announce_count_ipv6++; #endif } const char *Torrent::DHT_Status() const { if(!HasDHT() || Private()) return ""; static xstring status; status.nset("",0); if(dht_announce_count || dht_announce_count_ipv6) { status.append(_("announced via ")); if(dht_announce_count) status.appendf("ipv4:%d",dht_announce_count); #if INET6 if(dht_announce_count_ipv6) { if(dht_announce_count) status.append(", "); status.appendf("ipv6:%d",dht_announce_count_ipv6); } #endif } if(!dht_announce_timer.Stopped() && !validating) { if(status.length()>0) status.append("; "); status.appendf(_("next announce in %s"), dht_announce_timer.TimeLeft().toString(TimeInterval::TO_STR_TRANSLATE)); } return status; } const char *Torrent::GetMetadataPath() const { if(!QueryBool("torrent:save-metadata",0)) return NULL; xstring& path=xstring::cat(get_lftp_data_dir(),"/torrent",NULL); mkdir(path,0700); path.append("/md"); mkdir(path,0700); path.append('/'); info_hash.hexdump_to(path); return path; } bool Torrent::SaveMetadata() const { if(md_saved) return true; // saved already. const char *path=GetMetadataPath(); if(!path) return false; int fd=open(path,O_CREAT|O_WRONLY,0600); if(fd<0) { LogError(9,"open(%s): %s",path,strerror(errno)); return false; } int bytes_to_write=metadata.length(); int res=write(fd,metadata.get(),bytes_to_write); int saved_errno=errno; ftruncate(fd,bytes_to_write); close(fd); if(res==bytes_to_write) return true; // no error, fd is closed. if(res<0) LogError(9,"write(%s): %s",path,strerror(saved_errno)); else LogError(9,"write(%s): short write (only wrote %d bytes)",path,res); return false; } bool Torrent::LoadMetadata(const char *path) { int fd=open(path,O_RDONLY); if(fd<0) { LogError(9,"open(%s): %s",path,strerror(errno)); return false; } struct stat st; if(fstat(fd,&st)==-1) { close(fd); return false; } int bytes_to_read=st.st_size; xstring md; int res=read(fd,md.add_space(bytes_to_read),bytes_to_read); int saved_errno=errno; close(fd); if(res==bytes_to_read) { // no error, fd is closed. md.add_commit(res); xstring new_info_hash; SHA1(md,new_info_hash); if(info_hash && info_hash.ne(new_info_hash)) { LogError(9,"cached metadata does not match info_hash"); return false; } LogNote(9,"got metadata from %s",path); if(SetMetadata(md)) { md_saved=true; return true; } return false; } if(res<0) LogError(9,"read(%s): %s",path,strerror(saved_errno)); else LogError(9,"read(%s): short read (only read %d bytes)",path,res); return false; } void Torrent::FetchMetadataFromURL(const char *url) { ParsedURL u(url,true); if(!u.proto) { u.proto.set("file"); u.path.set(url); // undo %xx translation } LogNote(9,"Retrieving meta-data from `%s'...\n",url); FileCopyPeer *metainfo_src=new FileCopyPeerFA(&u,FA::RETRIEVE); FileCopyPeer *metainfo_dst=new FileCopyPeerMemory(10000000); metainfo_copy=new FileCopy(metainfo_src,metainfo_dst,false); } void Torrent::StartMetadataDownload() { const char *path=GetMetadataPath(); if(path && access(path,R_OK)>=0) { // we have the metadata cached if(LoadMetadata(path)) { if(stop_if_known) { LogNote(2,"found cached metadata, stopping"); Shutdown(); } else { Startup(); } return; } } md_download.nset("",0); // add torrent without metadata to announce it and get peers to get MD from. AddTorrent(this); } void Torrent::SetTotalLength(off_t tl) { total_length=tl; LogNote(4,"Total length is %llu",total_length); total_left=total_length; last_piece_length=total_length%piece_length; if(last_piece_length==0) last_piece_length=piece_length; total_pieces=(total_length+piece_length-1)/piece_length; my_bitfield=new BitField(total_pieces); blocks_in_piece=(piece_length+BLOCK_SIZE-1)/BLOCK_SIZE; blocks_in_last_piece=(last_piece_length+BLOCK_SIZE-1)/BLOCK_SIZE; piece_info=new TorrentPiece[total_pieces](); } void Torrent::StartValidating() { validate_index=0; validating=true; recv_rate.Reset(); } bool Torrent::SetMetadata(const xstring& md) { metadata.set(md); timeout_timer.Reset(); xstring new_info_hash; SHA1(metadata,new_info_hash); if(info_hash && info_hash.ne(new_info_hash)) { metadata.unset(); SetError("metadata does not match info_hash"); return false; } info_hash.set(new_info_hash); if(!info) { int rest; info=BeNode::Parse(metadata,metadata.length(),&rest); if(!info) { SetError("cannot parse metadata"); return false; } xmap_p d; d.add("info",info); metainfo_tree=new BeNode(&d); InitTranslation(); } BeNode *b_piece_length=Lookup(info,"piece length",BeNode::BE_INT); if(!b_piece_length || b_piece_length->num<1024 || b_piece_length->num>INT_MAX/4) { SetError("Meta-data: invalid piece length"); return false; } piece_length=b_piece_length->num; LogNote(4,"Piece length is %u",piece_length); BeNode *b_name=info->lookup("name",BeNode::BE_STR); BeNode *b_name_utf8=info->lookup("name.utf-8",BeNode::BE_STR); if(b_name_utf8) { TranslateStringFromUTF8(b_name_utf8); name.set(b_name_utf8->str_lc); } else if(b_name) { TranslateString(b_name); name.set(b_name->str_lc); } else { name.truncate(); info_hash.hexdump_to(name); } Reconfig(0); BeNode *files=info->lookup("files"); if(!files) { BeNode *length=Lookup(info,"length",BeNode::BE_INT); if(!length || length->num<0) { SetError("Meta-data: invalid or missing length"); return false; } total_length=length->num; } else { if(files->type!=BeNode::BE_LIST) { SetError("Meta-data: wrong `info/files' type, must be LIST"); return false; } total_length=0; for(int i=0; ilist.length(); i++) { if(files->list[i]->type!=BeNode::BE_DICT) { SetError(xstring::format("Meta-data: wrong `info/files[%d]' type, must be LIST",i)); return false; } BeNode *f=Lookup(files->list[i]->dict,"length",BeNode::BE_INT); if(!f || f->num<0) { SetError("Meta-data: invalid or missing file length"); return false; } if(!Lookup(files->list[i]->dict,"path",BeNode::BE_LIST)) { SetError("Meta-data: file path missing"); return false; } total_length+=f->num; } } this->files=new TorrentFiles(files,this); SetTotalLength(total_length); BeNode *b_pieces=Lookup(info,"pieces",BeNode::BE_STR); if(!b_pieces) { SetError("Meta-data: `pieces' missing"); return false; } pieces=&b_pieces->str; if(pieces->length()!=SHA1_DIGEST_SIZE*total_pieces) { SetError("Meta-data: invalid `pieces' length"); return false; } is_private=info->lookup_int("private"); return true; } void Torrent::Startup() { if(!info_hash || !metadata) SetError("missing metadata"); if(shutting_down) return; Torrent *other=FindTorrent(info_hash); if(other) { if(other!=this) { SetError("This torrent is already running"); return; } } else { AddTorrent(this); } if(!building) md_saved=SaveMetadata(); if(!force_valid && !building) { StartValidating(); } else { my_bitfield->set_range(0,total_pieces,1); complete_pieces=total_pieces; total_left=0; complete=true; seed_timer.Reset(); dht_announce_timer.Stop(); } RestartPeers(); } void Torrent::RestartPeers() { for(int i=0; iRestart(); } static int base32_char_to_value(char c) { if(c>='a' && c<='z') return c-'a'; if(c>='A' && c<='Z') return c-'A'; if(c>='2' && c<='7') return c-'2'+26; if(c=='=') return 0; return -1; } void base32_decode(const char *base32,xstring& out) { unsigned data=0; int data_bits=0; int pad_bits=0; while(*base32) { char c=*base32++; if(c=='=' && data_bits<=pad_bits) return; if(pad_bits>0 && c!='=') return; int v=base32_char_to_value(c); if(v==-1) return; data|=((v&31)<<(11-data_bits)); data_bits+=5; if(c=='=') pad_bits+=5; if(data_bits>=8) { out.append(char((data>>8)&255)); data<<=8; data_bits-=8; } } if(data_bits>0) out.append(char((data>>8)&255)); } void Torrent::ParseMagnet(const char *m0) { char *m=alloca_strdup(m0); for(char *p=strtok(m,"&"); p; p=strtok(NULL,"&")) { char *v=strchr(p,'='); if(!v) continue; *v++=0; v=xstring::get_tmp(v).url_decode(URL_DECODE_PLUS).get_non_const(); if(!strcmp(p,"xt")) { if(strncmp(v,"urn:btih:",9)) { SetError("Only BitTorrent magnet links are supported"); return; } v+=9; xstring& btih=xstring::get_tmp(v); if(btih.length()==40) { btih.hex_decode(); if(btih.length()!=20) { SetError("Invalid value of urn:btih in magnet link"); return; } info_hash.move_here(btih); } else { info_hash.truncate(); base32_decode(v,info_hash); if(info_hash.length()!=20) { info_hash.unset(); SetError("Invalid value of urn:btih in magnet link"); return; } } } else if(!strcmp(p,"tr")) { SMTaskRef new_tracker(new TorrentTracker(this,v)); if(!new_tracker->Failed()) { new_tracker->tracker_no=trackers.count(); trackers.append(new_tracker.borrow()); } } else if(!strcmp(p,"dn")) { name.set(v); } } if(!info_hash) { SetError("missing urn:btih in magnet link"); return; } if(FindTorrent(info_hash)) { SetError("This torrent is already running"); return; } StartMetadataDownload(); } void Torrent::CalcPiecesStats() { min_piece_sources=INT_MAX; avg_piece_sources=0; pieces_available_pct=0; for(unsigned i=0; iget_bit(i)) continue; unsigned sc=piece_info[i].get_sources_count(); if(min_piece_sources>sc) min_piece_sources=sc; if(sc==0) continue; pieces_available_pct++; avg_piece_sources+=sc; } avg_piece_sources=avg_piece_sources*256/(total_pieces-complete_pieces); pieces_available_pct=pieces_available_pct*100/(total_pieces-complete_pieces); CalcPerPieceRatio(); } void Torrent::CalcPerPieceRatio() { current_min_ppr=1024; current_max_ppr=0; for(unsigned i=0; ippr) current_min_ppr=ppr; if(current_max_pprget_bit(i)) { if(!piece_info[i].has_a_downloader()) enter_end_game=false; if(piece_info[i].has_no_sources()) continue; pieces_needed.append(i); } piece_info[i].cleanup(); } if(!end_game && enter_end_game) { LogNote(1,"entering End Game mode"); end_game=true; } cmp_torrent=this; pieces_needed.qsort(PiecesNeededCmp); CalcPiecesStats(); pieces_timer.Reset(); } TorrentBuild::TorrentBuild(const char *path) : top_path(path), name(basename_ptr(path)), done(false), total_length(0), piece_length(0) { name.rtrim('/'); struct stat st; if(stat(path,&st)==-1) { error=SysError(); return; } if(S_ISREG(st.st_mode)) { total_length=st.st_size; LogNote(10,"single file %s, size %lld",path,(long long)st.st_size); Finish(); return; } if(!S_ISDIR(st.st_mode)) { error=new Error(-1,"Need a plain file or directory",true); return; } QueueDir(""); } const char *TorrentBuild::lc_to_utf8(const char *s) { if(!translate || !s) return s; translate->ResetTranslation(); translate->PutTranslated(s); int len; translate->Get(&s,&len); translate->Skip(len); return xstring::get_tmp(s,len); } void TorrentBuild::Finish() { done=true; LogNote(10,"scan finished, total_length=%lld",(long long)total_length); translate=new DirectedBuffer(DirectedBuffer::PUT); translate->SetTranslation("UTF-8",false); xmap_p *b_info=new xmap_p(); b_info->add("name",new BeNode(lc_to_utf8(name))); piece_length=16*1024; off_t length_scan=piece_length*2200; while(length_scan<=total_length) { piece_length*=2; length_scan*=2; } b_info->add("piece length",new BeNode(piece_length)); if(files.count()==0) { b_info->add("length",new BeNode(total_length)); } else { files.Sort(FileSet::BYNAME); files.rewind(); xarray_p *b_files=new xarray_p(); for(FileInfo *fi=files.curr(); fi; fi=files.next()) { xarray_p *path=new xarray_p(); const char *name_utf8=lc_to_utf8(fi->name); char *p=alloca_strdup(name_utf8); for(p=strtok(p,"/"); p; p=strtok(NULL,"/")) path->append(new BeNode(p)); xmap_p *b_file=new xmap_p(); b_file->add("path",new BeNode(path)); b_file->add("length",new BeNode(fi->size)); b_files->append(new BeNode(b_file)); } b_info->add("files",new BeNode(b_files)); } info=new BeNode(b_info); } void TorrentBuild::SetPiece(unsigned p,const xstring& sha) { assert(pieces.length()==p*20); // require sequential building pieces.append(sha); } const xstring& TorrentBuild::GetMetadata() { info->dict.add("pieces",new BeNode(pieces)); return info->Pack(); } const xstring& TorrentBuild::Status() const { if(Done()) return xstring::get_tmp(""); const char *curr=CurrPath(); int count=files.count(); if(curr[0]) return xstring::format(plural("%d file$|s$ found, now scanning %s",count),count,curr); return xstring::format(plural("%d file$|s$ found",count),count); } int TorrentBuild::Do() { int m=STALL; if(Done()) return m; const char *curr_path=CurrPath(); if(!curr_path) { Finish(); return MOVED; } const char *full_path=dir_file(top_path,curr_path); full_path=alloca_strdup(full_path); DIR *dir=opendir(full_path); if(!dir) { if(NonFatalError(errno)) return m; if(dirs_to_scan.Count()<=1) error=SysError(); else LogError(0,"opendir(%s): %s",full_path,strerror(errno)); NextDir(); return MOVED; } LogNote(10,"scanning %s",full_path); struct dirent *entry; while((entry=readdir(dir))!=0) { if(!strcmp(entry->d_name,".") || !strcmp(entry->d_name,"..")) continue; struct stat st; const char *path=dir_file(full_path,entry->d_name); if(lstat(path,&st)==-1) { LogError(0,"stat(%s): %s",path,strerror(errno)); continue; } if(S_ISREG(st.st_mode)) AddFile(dir_file(curr_path,entry->d_name),&st); else if(S_ISDIR(st.st_mode)) QueueDir(dir_file(curr_path,entry->d_name)); else LogNote(10,"ignoring %s (not a directory nor a plain file)",path); } closedir(dir); NextDir(); return MOVED; } void TorrentBuild::AddFile(const char *path,struct stat *st) { FileInfo *fi=new FileInfo(path); fi->SetSize(st->st_size); files.Add(fi); total_length+=st->st_size; LogNote(10,"adding %s, size %lld",path,(long long)fi->size); } int Torrent::Do() { int m=STALL; if(Done() || shutting_down) return m; if(!complete && !building && timeout_timer.Stopped()) { SetError("timed out with no progress"); return MOVED; } if(build_md && !files) { if(!building) building=new TorrentBuild(metainfo_url); if(!building->Done()) return m; m=MOVED; if(building->Failed()) { SetError(building->ErrorText()); return m; } InitTranslation(); name.set(building->name); piece_length=building->piece_length; SetTotalLength(building->total_length); files=new TorrentFiles(building->GetFilesNode(),this); output_dir.set(building->GetBaseDirectory()); StartValidating(); } if(!metainfo_tree && metainfo_url && !md_download && !build_md) { // retrieve metainfo if don't have already. if(!metainfo_copy) { if(metainfo_url.begins_with("magnet:?")) { ParseMagnet(metainfo_url+8); return MOVED; } if(metainfo_url.length()==40 && strspn(metainfo_url,"0123456789ABCDEFabcdef")==40 && access(metainfo_url,0)==-1) { xstring& btih=xstring::get_tmp(metainfo_url); btih.hex_decode(); assert(btih.length()==20); info_hash.move_here(btih); if(FindTorrent(info_hash)) { SetError("This torrent is already running"); return MOVED; } StartMetadataDownload(); return MOVED; } FetchMetadataFromURL(metainfo_url); m=MOVED; } if(metainfo_copy->Error()) { SetError(Error::Fatal(metainfo_copy->ErrorText())); metainfo_copy=0; return MOVED; } if(!metainfo_copy->Done()) return m; LogNote(9,"meta-data EOF\n"); int rest; const char *metainfo_buf; int metainfo_len; metainfo_copy->put->Get(&metainfo_buf,&metainfo_len); metainfo_tree=BeNode::Parse(metainfo_buf,metainfo_len,&rest); metainfo_copy=0; if(!metainfo_tree) { SetError("Meta-data parse error"); return MOVED; } if(rest>0) { SetError("Junk at the end of Meta-data"); return MOVED; } InitTranslation(); LogNote(10,"Received meta-data:"); Log::global->Write(10,metainfo_tree->Format()); if(metainfo_tree->type!=BeNode::BE_DICT) { SetError("Meta-data: wrong top level type, must be DICT"); return MOVED; } const char *retracker=ResMgr::Query("torrent:retracker",GetName()); int retracker_len=xstrlen(retracker); BeNode *announce_list=metainfo_tree->lookup("announce-list",BeNode::BE_LIST); if(announce_list) { for(int i=0; ilist.length(); i++) { BeNode *announce_list1=announce_list->list[i]; if(announce_list1->type!=BeNode::BE_LIST) continue; SMTaskRef new_tracker; for(int j=0; jlist.length(); j++) { BeNode *announce=announce_list1->list[j]; if(announce->type!=BeNode::BE_STR) continue; if(retracker_len && !strncmp(retracker,announce->str,retracker_len)) retracker=0, retracker_len=0; if(!new_tracker) new_tracker=new TorrentTracker(this,announce->str); else new_tracker->AddURL(announce->str); } if(new_tracker && !new_tracker->Failed()) { new_tracker->tracker_no=trackers.count(); trackers.append(new_tracker.borrow()); } } } if(trackers.count()==0) { const xstring& announce=metainfo_tree->lookup_str("announce"); if(announce) { SMTaskRef new_tracker( new TorrentTracker(this,announce)); if(!new_tracker->Failed()) trackers.append(new_tracker.borrow()); } } if(retracker_len) { SMTaskRef new_tracker(new TorrentTracker(this,retracker)); if(!new_tracker->Failed()) { new_tracker->tracker_no=trackers.count(); trackers.append(new_tracker.borrow()); } } BeNode *nodes=metainfo_tree->lookup("nodes",BeNode::BE_LIST); if(nodes && dht) { for(int i=0; ilist.count(); i++) { BeNode *n=nodes->list[i]; if(n->type!=BeNode::BE_LIST || n->list.count()<2) continue; BeNode *b_host=n->list[0]; BeNode *b_port=n->list[1]; if(b_host->type!=BeNode::BE_STR || b_port->type!=BeNode::BE_INT) continue; if(b_port->num<=0 || b_port->num>=65535) continue; ParsedURL u; u.host.set(b_host->str); u.port.set(xstring::format("%u",(unsigned)b_port->num)); xstring_ca node(u.Combine()); dht->AddBootstrapNode(node); #if INET6 dht_ipv6->AddBootstrapNode(node); #endif } } info=Lookup(metainfo_tree,"info",BeNode::BE_DICT); if(!info) return MOVED; if(SetMetadata(info->str)) Startup(); m=MOVED; if(Done()) return m; } if(peers_scan_timer.Stopped()) ScanPeers(); if(validating) { ValidatePiece(validate_index++); if(validate_indexGetMetadata())) return MOVED; building=0; xstring magnet("magnet:?xt=urn:btih:"); magnet.append(info_hash.hexdump()); magnet.appendf("&xl=%lld",(long long)total_length); magnet.append("&dn="); magnet.append_url_encoded(name,URL_PATH_UNSAFE); printf("%s\n",magnet.get()); Startup(); } RestartPeers(); dht_announce_timer.Stop(); } if(GetPort()) StartTrackers(); if(dht_announce_timer.Stopped()) AnnounceDHT(); // count peers connected_peers_count=0; active_peers_count=0; complete_peers_count=0; for(int i=0; iConnected(); active_peers_count+=peers[i]->Active(); complete_peers_count+=peers[i]->Complete(); } if(!metadata) return m; if(optimistic_unchoke_timer.Stopped()) OptimisticUnchoke(); // rebuild lists of needed pieces if(!complete && (pieces_needed.count()==0 || pieces_timer.Stopped())) RebuildPiecesNeeded(); if(complete) { if(pieces_timer.Stopped()) { CalcPerPieceRatio(); pieces_timer.Reset(); } if(SeededEnough()) { Shutdown(); return MOVED; } } return m; } void Torrent::BlackListPeer(const TorrentPeer *peer,const char *timeout) { if(peer->IsPassive() || GetTorrentsCount()==0) return; if(!black_list) black_list=new TorrentBlackList(); black_list->Add(peer->GetAddress(),timeout); } bool Torrent::BlackListed(const TorrentPeer *peer) { return black_list && black_list->Listed(peer->GetAddress()); } bool Torrent::CanAccept() const { return !validating && decline_timer.Stopped(); } void Torrent::Accept(int s,const sockaddr_u *addr,IOBuffer *rb) { if(!CanAccept()) { LogNote(4,"declining new connection"); Delete(rb); close(s); return; } TorrentPeer *p=new TorrentPeer(this,addr,TorrentPeer::TR_ACCEPTED); p->Connect(s,rb); AddPeer(p); } void Torrent::AddPeer(TorrentPeer *peer) { if(BlackListed(peer)) { Delete(peer); return; } for(int i=0; iAddressEq(peer)) { if(peer->Connected() && !peers[i]->Connected()) { peers[i]=peer; } else { Delete(peer); } return; } } peers.append(peer); } int Torrent::GetWantedPeersCount() const { int numwant=complete?seed_min_peers:max_peers/2; if(numwant>peers.count()) numwant-=peers.count(); else numwant=0; if(shutting_down) numwant=-1; if(numwant>1) { // count almost ready for request trackers int trackers_count=0; for(int i=0; itracker_timer.TimeLeft()<60); // request peers from all trackers evenly (round up). if(trackers_count) numwant=(numwant+trackers_count-1)/trackers_count; } return numwant; } const char *Torrent::MakePath(BeNode *p) const { BeNode *path=p->lookup("path.utf-8",BeNode::BE_LIST); void (Torrent::*tr)(BeNode*)const=&Torrent::TranslateStringFromUTF8; if(!path) { path=p->lookup("path",BeNode::BE_LIST); tr=&Torrent::TranslateString; } static xstring buf; buf.set(name); if(buf.eq("..") || buf[0]=='/') { buf.set_substr(0,0,"_",1); } for(int i=0; ilist.count(); i++) { BeNode *e=path->list[i]; if(e->type==BeNode::BE_STR) { (this->*tr)(e); buf.append('/'); if(e->str_lc.eq("..")) buf.append('_'); buf.append(e->str_lc); } } return buf; } const char *Torrent::FindFileByPosition(unsigned piece,unsigned begin,off_t *f_pos,off_t *f_tail) const { off_t target_pos=(off_t)piece*piece_length+begin; TorrentFile *file=files->FindByPosition(target_pos); if(!file) return 0; *f_pos=target_pos-file->pos; *f_tail=file->length-*f_pos; return file->path; } TorrentFiles::TorrentFiles(const BeNode *files,const Torrent *t) { if(!files) { grow_space(1); set_length(1); file(0)->set(t->GetName(),0,t->TotalLength()); } else { int count=files->list.length(); grow_space(count); set_length(count); off_t scan_pos=0; for(int i=0; ilist[i]; off_t file_length=node->lookup_int("length"); file(i)->set(t->MakePath(node),scan_pos,file_length); scan_pos+=file_length; } } qsort(pos_cmp); } TorrentFile *TorrentFiles::FindByPosition(off_t pos) { int i=0; int j=length()-1; while(i<=j) { // invariant: the target element is in the range [i,j] int m=(i+j)/2; if(file(m)->contains_pos(pos)) return file(m); if(file(m)->pos>pos) j=m-1; else i=m+1; } return 0; } FDCache::FDCache() : clean_timer(1) { max_count=16; max_time=30; } FDCache::~FDCache() { CloseAll(); } void FDCache::Clean() { for(int i=0; i<3; i++) { xmap& cache=this->cache[i]; for(const FD *f=&cache.each_begin(); f->last_used; f=&cache.each_next()) { if(f->fd==-1) { if(f->last_used+1last_used+max_timefd); cache.remove(cache.each_key()); } } } while(Count()>max_count && CloseOne()) /*empty*/; if(Count()>0) clean_timer.Reset(); } int FDCache::Do() { if(clean_timer.Stopped()) Clean(); return STALL; } void FDCache::Close(const char *name) { const xstring &n=xstring::get_tmp(name); for(int i=0; i<3; i++) { const FD& f=cache[i].lookup(n); if(f.last_used!=0) { if(f.fd!=-1) { ProtoLog::LogNote(9,"closing %s",name); #ifdef HAVE_POSIX_FADVISE if(i==O_RDONLY) // avoid filling up the cache posix_fadvise(f.fd,0,0,POSIX_FADV_DONTNEED); #endif close(f.fd); } cache[i].remove(n); } } } void FDCache::CloseAll() { for(int i=0; i<3; i++) { xmap& cache=this->cache[i]; for(const FD *f=&cache.each_begin(); f->last_used; f=&cache.each_next()) { if(f->fd!=-1) { ProtoLog::LogNote(9,"closing %s",cache.each_key().get()); close(f->fd); } cache.remove(cache.each_key()); } } } bool FDCache::CloseOne() { int oldest_mode=0; int oldest_fd=-1; int oldest_time=0; const xstring *oldest_key=0; for(int i=0; i<3; i++) { xmap& cache=this->cache[i]; for(const FD *f=&cache.each_begin(); f->last_used; f=&cache.each_next()) { if(f->fd==-1) continue; if(oldest_key==0 || f->last_usedlast_used; oldest_fd=f->fd; oldest_mode=i; } } } if(!oldest_key) return false; if(oldest_fd!=-1) { ProtoLog::LogNote(9,"closing %s",oldest_key->get()); close(oldest_fd); } cache[oldest_mode].remove(*oldest_key); return true; } int FDCache::Count() const { return cache[0].count()+cache[1].count()+cache[2].count(); } int FDCache::OpenFile(const char *file,int m,off_t size) { int ci=m&3; assert(ci<3); FD& f=cache[ci].lookup_Lv(file); if(f.last_used!=0) { if(f.fd!=-1) f.last_used=now.UnixTime(); else errno=f.saved_errno; return f.fd; } if(ci==O_RDONLY) { // O_RDWR also will do, check if we have it. const FD& f_rw=cache[O_RDWR].lookup(file); if(f_rw.last_used!=0 && f_rw.fd!=-1) { // don't update last_used to expire it and reopen with proper mode return f_rw.fd; } } Clean(); clean_timer.Reset(); ProtoLog::LogNote(9,"opening %s",file); int fd; do { fd=open(file,m,0664); } while(fd==-1 && (errno==EMFILE || errno==ENFILE) && CloseOne()); FD new_entry = {fd,errno,now.UnixTime()}; cache[ci].add(file,new_entry); if(fd!=-1) fcntl(fd,F_SETFD,FD_CLOEXEC); if(fd==-1 || size==0) return fd; if(ci==O_RDWR && QueryBool("file:use-fallocate",0)) { struct stat st; // check if it is newly created file, then allocate space if(fstat(fd,&st)!=-1 && st.st_size==0) { if(lftp_fallocate(fd,size)==-1 && errno!=ENOSYS && errno!=EOPNOTSUPP) { ProtoLog::LogError(9,"space allocation for %s (%lld bytes) failed: %s", file,(long long)size,strerror(errno)); } } } #ifdef HAVE_POSIX_FADVISE if(ci==O_RDONLY) { // validation mode (when validating, size>0) posix_fadvise(fd,0,size,POSIX_FADV_SEQUENTIAL); posix_fadvise(fd,0,size,POSIX_FADV_NOREUSE); } #endif//HAVE_POSIX_FADVISE return fd; } int Torrent::OpenFile(const char *file,int m,off_t size) { bool did_mkdir=false; if(!fd_cache) fd_cache=new FDCache(); try_again: const char *cf=dir_file(output_dir,file); int fd=fd_cache->OpenFile(cf,m,size); while(fd==-1 && (errno==EMFILE || errno==ENFILE) && peers.count()>0) { peers.chop(); // free an fd fd=fd_cache->OpenFile(cf,m,size); } if(validating) return fd; if(fd==-1) fd_cache->Close(cf); // remove negative cache. if(fd==-1 && errno==ENOENT && !did_mkdir) { LogError(10,"open(%s): %s",cf,strerror(errno)); const char *sl=strchr(file,'/'); while(sl) { if(sl>file) { if(mkdir(cf=dir_file(output_dir,xstring::get_tmp(file,sl-file)),0775)==-1 && errno!=EEXIST) LogError(9,"mkdir(%s): %s",cf,strerror(errno)); } sl=strchr(sl+1,'/'); } did_mkdir=true; goto try_again; } return fd; } void Torrent::CloseFile(const char *file) const { if(!fd_cache) return; fd_cache->Close(dir_file(output_dir,file)); } void Torrent::SetPieceNotWanted(unsigned piece) { for(int j=0; jCancelBlock(piece,begin); unsigned b=begin/BLOCK_SIZE; int bc=(len+BLOCK_SIZE-1)/BLOCK_SIZE; off_t f_pos=0; off_t f_rest=len; while(len>0) { const char *file=FindFileByPosition(piece,begin,&f_pos,&f_rest); int fd=OpenFile(file,O_RDWR|O_CREAT,f_pos+f_rest); if(fd==-1) { SetError(xstring::format("open(%s): %s",file,strerror(errno))); return; } int w=pwrite(fd,buf,MIN(f_rest,len),f_pos); int saved_errno=errno; if(w==-1) { SetError(xstring::format("pwrite(%s): %s",file,strerror(saved_errno))); return; } if(w==0) { SetError(xstring::format("pwrite(%s): write error - disk full?",file)); return; } buf+=w; begin+=w; len-=w; } while(bc-->0) { SetBlockPresent(piece,b++); } if(AllBlocksPresent(piece) && !my_bitfield->get_bit(piece)) { ValidatePiece(piece); if(!my_bitfield->get_bit(piece)) { LogError(0,"new piece %u digest mismatch",piece); src_peer->MarkPieceInvalid(piece); return; } LogNote(3,"piece %u complete",piece); timeout_timer.Reset(); SetPieceNotWanted(piece); for(int i=0; iHave(piece); if(my_bitfield->has_all_set() && !complete) { complete=true; seed_timer.Reset(); end_game=false; ScanPeers(); SendTrackersRequest("completed"); recv_rate.Reset(); } } } void Torrent::SendTrackersRequest(const char *event) const { for(int i=0; iFailed()) trackers[i]->SendTrackerRequest(event); } } const xstring& Torrent::RetrieveBlock(unsigned piece,unsigned begin,unsigned len) { static xstring buf; buf.truncate(0); buf.get_space(len); off_t f_pos=0; off_t f_rest=len; while(len>0) { const char *file=FindFileByPosition(piece,begin,&f_pos,&f_rest); int fd=OpenFile(file,O_RDONLY,validating?f_pos+f_rest:0); if(fd==-1) return xstring::null; int w=pread(fd,buf.add_space(len),MIN(f_rest,len),f_pos); if(w==-1) { SetError(xstring::format("pread(%s): %s",file,strerror(errno))); return xstring::null; } if(w==0) { // buf.append_padding(len,'\0'); break; } buf.add_commit(w); begin+=w; len-=w; if(validating && w==f_rest) CloseFile(file); } return buf; } TorrentPeer *Torrent::FindPeerById(const xstring& p_id) { // linear search - peers count<100, called rarely for(int i=0; ipeer_id.eq(p_id)) return const_cast(peer); } return 0; } void Torrent::ScanPeers() { // scan existing peers for(int i=0; iFailed()) { LogError(2,"peer %s failed: %s",peer->GetName(),peer->ErrorText()); } else if(peer->Disconnected() && peer->ActivityTimedOut()) { LogNote(4,"peer %s disconnected",peer->GetName()); } else if(peer->myself) { LogNote(4,"removing myself-connected peer %s",peer->GetName()); blacklist_time="forever"; } else if(peer->duplicate) { LogNote(4,"removing duplicate peer %s",peer->GetName()); } else if(complete && peer->Seed()) { LogNote(4,"removing unneeded peer %s (%s)",peer->GetName(),peers[i]->Status()); blacklist_time="1d"; } else { // keep the peer. continue; } if(blacklist_time) BlackListPeer(peer,blacklist_time); peers.remove(i--); } ReducePeers(); peers_scan_timer.Reset(); } void Torrent::ReducePeers() { if(max_peers>0 && peers.count()>max_peers) { // remove least interesting peers. peers.qsort(PeersCompareActivity); int to_remove=peers.count()-max_peers; while(to_remove-->0) { TimeInterval max_idle(peers.last()->activity_timer.TimePassed()); LogNote(3,"removing peer %s (too many; idle:%s)",peers.last()->GetName(), max_idle.toString(TimeInterval::TO_STR_TERSE+TimeInterval::TO_STR_TRANSLATE)); peers.chop(); if(max_idle<60) decline_timer.Set(60-max_idle.Seconds()); } } peers.qsort(complete ? PeersCompareSendRate : PeersCompareRecvRate); ReduceUploaders(); ReduceDownloaders(); UnchokeBestUploaders(); } void Torrent::ReduceUploaders() { bool rate_low = RateLow(RateLimit::GET); if(am_interested_peers_count < (rate_low?max_uploaders:min_uploaders+1)) return; // make the slowest uninterested for(int i=0; iam_interested) { if(peer->interest_timer.TimePassed() <= 30) break; peer->SetAmInterested(false); if(am_interested_peers_count < max_uploaders) break; } } } void Torrent::ReduceDownloaders() { bool rate_low = RateLow(RateLimit::PUT); if(am_not_choking_peers_count < (rate_low?max_downloaders:min_downloaders+1)) return; // choke the slowest for(int i=0; iam_choking && peer->peer_interested) { if(peer->choke_timer.TimePassed() <= 30) break; peer->SetAmChoking(true); if(am_not_choking_peers_count < max_downloaders) break; } } } bool Torrent::NeedMoreUploaders() { if(!metadata || validating) return false; return RateLow(RateLimit::GET) && am_interested_peers_count < max_uploaders && am_interested_timer.Stopped(); } bool Torrent::AllowMoreDownloaders() { if(!metadata || validating) return false; return RateLow(RateLimit::PUT) && am_not_choking_peers_count < max_downloaders; } void Torrent::UnchokeBestUploaders() { if(!metadata) return; // unchoke 4 best uploaders int limit = 4; int count=0; for(int i=peers.count()-1; i>=0 && countConnected()) continue; if(!peer->choke_timer.Stopped()) continue; // cannot change choke status yet if(!peer->peer_interested) continue; peer->SetAmChoking(false); count++; } } void Torrent::OptimisticUnchoke() { xarray choked_peers; for(int i=peers.count()-1; i>=0; i--) { TorrentPeer *peer=peers[i].get_non_const(); if(!peer->Connected()) continue; if(!peer->choke_timer.Stopped()) continue; // cannot change choke status yet if(peer->am_choking) { if(!peer->peer_interested) { peer->SetAmChoking(false); continue; } choked_peers.append(peer); if(peer->retry_timer.TimePassed()<60) { // newly connected is more likely to be unchoked choked_peers.append(peer); choked_peers.append(peer); } } } if(choked_peers.count()==0) return; choked_peers[rand()/13%choked_peers.count()]->SetAmChoking(false); optimistic_unchoke_timer.Reset(); } int Torrent::PeerBytesAllowed(const TorrentPeer *peer,RateLimit::dir_t dir) { float peer_rate=(dir==RateLimit::GET ? peer->peer_send_rate : peer->peer_recv_rate).Get(); float total_rate=(dir==RateLimit::GET ? send_rate : recv_rate).Get(); const int min_rate = 1024; // the more is the opposite rate the more rate allowed, with a minimum float bytes = rate_limit.BytesAllowed(dir); bytes *= (peer_rate + min_rate) / (total_rate + active_peers_count*min_rate); return (int)bytes; } void Torrent::PeerBytesUsed(int b,RateLimit::dir_t dir) { rate_limit.BytesUsed(b,dir); } void Torrent::Reconfig(const char *name) { const char *c=GetName(); max_peers=ResMgr::Query("torrent:max-peers",c); seed_min_peers=ResMgr::Query("torrent:seed-min-peers",c); stop_on_ratio=ResMgr::Query("torrent:stop-on-ratio",c); stop_min_ppr=ResMgr::Query("torrent:stop-min-ppr",c); rate_limit.Reconfig(name,metainfo_url); if(listener) StartDHT(); } const xstring& Torrent::Status() { if(metainfo_copy) return xstring::format(_("Getting meta-data: %s"),metainfo_copy->GetStatus()); if(validating) { return xstring::format(_("Validation: %u/%u (%u%%) %s%s"),validate_index,total_pieces, validate_index*100/total_pieces,recv_rate.GetStrS(), recv_rate.GetETAStrFromSize((off_t)(total_pieces-validate_index-1)*piece_length+last_piece_length).get()); } if(building) return building->Status(); if(!metadata && !build_md) { if(md_download.length()>0) return xstring::format(_("Getting meta-data: %s"), xstring::format("%u/%u",(unsigned)md_download.length(),(unsigned)metadata_size).get()); else return xstring::get_tmp(_("Waiting for meta-data...")); } if(shutting_down) { for(int i=0; iIsActive()) continue; const char *status=trackers[i]->Status(); if(status[0]) { xstring &s=xstring::get_tmp(_("Shutting down: ")); if(trackers.count()>1) s.appendf("%d. ",i+1); s.append(status); return s; } } return xstring::get_tmp(""); } if(total_length==0) return xstring::get_tmp(""); char dn_buf[LONGEST_HUMAN_READABLE + 1]; char up_buf[LONGEST_HUMAN_READABLE + 1]; xstring& buf=xstring::format("dn:%s %sup:%s %s", human_readable(total_recv, dn_buf, human_autoscale|human_SI, 1, 1), recv_rate.GetStrS(), human_readable(total_sent, up_buf, human_autoscale|human_SI, 1, 1), send_rate.GetStrS()); if(!complete) { buf.appendf("complete:%u/%u (%u%%)",complete_pieces,total_pieces, unsigned((total_length-total_left)*100/total_length)); buf.append(' '); if(min_piece_sources) buf.append(recv_rate.GetETAStrFromSize(total_left)); if(end_game) buf.append(" end-game"); } else { buf.appendf("complete, ratio:%.2f/%.2f/%.2f", GetMinPerPieceRatio(),GetRatio(),GetMaxPerPieceRatio()); } return buf; } TorrentPeer::TorrentPeer(Torrent *p,const sockaddr_u *a,int t_no) : timeout_timer(360), retry_timer(30), keepalive_timer(120), choke_timer(10), interest_timer(10), activity_timer(300), msg_ext_metadata(0), msg_ext_pex(0), metadata_size(0) { parent=p; tracker_no=t_no; addr=*a; sock=-1; udp_port=0; connected=false; passive=false; duplicate=0; myself=false; peer_choking=true; am_choking=true; peer_interested=false; am_interested=false; upload_only=false; peer_complete_pieces=0; retry_timer.Stop(); retry_timer.AddRandom(2); choke_timer.Stop(); interest_timer.Stop(); last_piece=NO_PIECE; if(addr.is_reserved() || addr.is_multicast() || addr.port()==0) SetError("invalid peer address"); peer_bytes_pool[0]=peer_bytes_pool[1]=0; peer_recv=peer_sent=0; invalid_piece_count=0; } TorrentPeer::~TorrentPeer() { } void TorrentPeer::PrepareToDie() { Disconnect(); } void TorrentPeer::Connect(int s,IOBuffer *rb) { sock=s; recv_buf=rb; connected=true; passive=true; } void TorrentPeer::SetError(const char *s) { error=Error::Fatal(s); LogError(11,"fatal error: %s",s); Disconnect(s); } void TorrentPeer::Restart() { if(!Connected()) return; Disconnect(); retry_timer.Stop(); retry_timer.AddRandom(2); } void TorrentPeer::Disconnect(const char *dc) { Enter(); if(Connected() && !recv_buf->Eof()) LogNote(4,"closing connection"); recv_queue.empty(); ClearSentQueue(); if(peer_bitfield) { for(unsigned p=0; ptotal_pieces; p++) SetPieceHaving(p,false); peer_bitfield=0; } peer_id.unset(); fast_set.empty(); suggested_set.empty(); recv_buf=0; send_buf=0; if(sock!=-1) { close(sock); sock=-1; connected=false; last_dc.set(dc); } parent->am_interested_peers_count-=am_interested; am_interested=false; parent->am_not_choking_peers_count-=!am_choking; am_choking=true; peer_interested=false; peer_choking=true; peer_complete_pieces=0; retry_timer.Reset(); choke_timer.Stop(); interest_timer.Stop(); // return to main pool parent->PeerBytesUsed(-peer_bytes_pool[RateLimit::GET],RateLimit::GET); parent->PeerBytesUsed(-peer_bytes_pool[RateLimit::PUT],RateLimit::PUT); peer_bytes_pool[0]=peer_bytes_pool[1]=0; Leave(); } void TorrentPeer::SendHandshake() { const char *const protocol="BitTorrent protocol"; int proto_len=strlen(protocol); send_buf->PackUINT8(proto_len); send_buf->Put(protocol,proto_len); static char extensions[8] = { // extensions[7]&0x01 - DHT Protocol (http://www.bittorrent.org/beps/bep_0005.html) // extensions[7]&0x04 - Fast Extension (http://www.bittorrent.org/beps/bep_0006.html) // extensions[5]&0x10 - Extension Protocol (http://www.bittorrent.org/beps/bep_0010.html) 0, 0, 0, 0, 0, 0x10, 0, 0x05, }; if(ResMgr::QueryBool("torrent:use-dht",0)) extensions[7]|=0x01; else extensions[7]&=~0x01; send_buf->Put(extensions,8); send_buf->Put(parent->info_hash); send_buf->Put(parent->my_peer_id); LogSend(9,"handshake"); } TorrentPeer::unpack_status_t TorrentPeer::RecvHandshake() { unsigned proto_len=0; if(recv_buf->Size()>0) proto_len=recv_buf->UnpackUINT8(); if((unsigned)recv_buf->Size()<1+proto_len+8+SHA1_DIGEST_SIZE+Torrent::PEER_ID_LEN) return recv_buf->Eof() ? UNPACK_PREMATURE_EOF : UNPACK_NO_DATA_YET; int unpacked=1; const char *data=recv_buf->Get(); xstring protocol(data+unpacked,proto_len); unpacked+=proto_len; memcpy(extensions,data+unpacked,8); unpacked+=8; // 8 bytes are reserved (extensions) xstring peer_info_hash(data+unpacked,SHA1_DIGEST_SIZE); unpacked+=SHA1_DIGEST_SIZE; if(peer_info_hash.ne(parent->info_hash)) { LogError(0,"got info_hash: %s, wanted: %s",peer_info_hash.hexdump(),parent->info_hash.hexdump()); SetError("peer info_hash mismatch"); return UNPACK_WRONG_FORMAT; } const xstring& tmp_peer_id=xstring::get_tmp(recv_buf->Get()+unpacked,Torrent::PEER_ID_LEN); unpacked+=Torrent::PEER_ID_LEN; // if we have already such a peer, then this peer or the other one // much be marked as duplicate and then removed in ScanPeers. duplicate=parent->FindPeerById(tmp_peer_id); if(duplicate && !duplicate->Connected()) { duplicate->duplicate=this; duplicate=0; } peer_id.set(tmp_peer_id); recv_buf->Skip(unpacked); LogRecv(4,xstring::format("handshake, %s, peer_id: %s, reserved: %02x%02x%02x%02x%02x%02x%02x%02x", protocol.dump(),url::encode(peer_id,"").get(), extensions[0],extensions[1],extensions[2],extensions[3], extensions[4],extensions[5],extensions[6],extensions[7])); return UNPACK_SUCCESS; } void TorrentPeer::SendExtensions() { if(!LTEPExtensionEnabled()) return; xmap_p m; m.add("ut_metadata",new BeNode(MSG_EXT_METADATA)); m.add("ut_pex",new BeNode(MSG_EXT_PEX)); xmap_p ext; ext.add("m",new BeNode(&m)); ext.add("p",new BeNode(parent->GetPort())); ext.add("v",new BeNode(PACKAGE "/" VERSION)); ext.add("reqq",new BeNode(MAX_QUEUE_LEN*16)); if(parent->Complete()) ext.add("upload_only",new BeNode(1)); if(parent->metadata) ext.add("metadata_size",new BeNode(parent->metadata.length())); const char *ip=ResMgr::Query("torrent:ip",0); sockaddr_u sa; socklen_t sa_len=sizeof(sa); if((ip && ip[0] && inet_aton(ip,&sa.in.sin_addr)) || (getsockname(sock,&sa.sa,&sa_len)!=-1 && sa.sa.sa_family==AF_INET)) ext.add("ipv4",new BeNode((const char*)&sa.in.sin_addr,4)); #if INET6 const char *ipv6=ResMgr::Query("torrent:ipv6",0); sa_len=sizeof(sa); if((ipv6 && ipv6[0] && inet_pton(AF_INET6,ipv6,&sa.in6.sin6_addr)>0) || (getsockname(sock,&sa.sa,&sa_len)!=-1 && sa.sa.sa_family==AF_INET6)) ext.add("ipv6",new BeNode((const char*)&sa.in6.sin6_addr,16)); #endif sa_len=sizeof(sa); if(getpeername(sock,&sa.sa,&sa_len)!=-1) { if(sa.sa.sa_family==AF_INET) ext.add("yourip",new BeNode((const char*)&sa.in.sin_addr,4)); #if INET6 else if(sa.sa.sa_family==AF_INET6) ext.add("yourip",new BeNode((const char*)&sa.in6.sin6_addr,16)); #endif } PacketExtended pkt(MSG_EXT_HANDSHAKE,new BeNode(&ext)); pkt.Pack(send_buf); LogSend(9,xstring::format("extended(%u,%s)",pkt.code,pkt.data->Format1())); } void TorrentPeer::SendDataReply() { const PacketRequest *p=recv_queue.next(); Enter(parent); const xstring& data=parent->RetrieveBlock(p->index,p->begin,p->req_length); Leave(parent); if(!Connected()) // we can be disconnected by parent return; if(data.length()!=p->req_length) { if(parent->my_bitfield->get_bit(p->index)) parent->SetError(xstring::format("failed to read piece %u",p->index)); return; } LogSend(8,xstring::format("piece:%u begin:%u size:%u",p->index,p->begin,p->req_length)); PacketPiece(p->index,p->begin,data).Pack(send_buf); peer_sent+=data.length(); peer_send_rate.Add(data.length()); parent->AccountSend(p->index,data.length()); BytesPut(data.length()); activity_timer.Reset(); } int TorrentPeer::SendDataRequests(unsigned p) { if(p==NO_PIECE) return 0; if(parent->my_bitfield->get_bit(p) || !peer_bitfield || !peer_bitfield->get_bit(p)) return 0; int sent=0; unsigned blocks=parent->BlocksInPiece(p); unsigned bytes_allowed=BytesAllowed(RateLimit::GET); for(unsigned b=0; bBlockPresent(p,b)) continue; if(parent->piece_info[p].downloader_for(b)) { if(!parent->end_game) continue; if(parent->piece_info[p].downloader_for(b)==this) continue; if(FindRequest(p,b*Torrent::BLOCK_SIZE)>=0) continue; } unsigned begin=b*Torrent::BLOCK_SIZE; unsigned len=Torrent::BLOCK_SIZE; if(b==blocks-1) { assert(beginPieceLength(p)); unsigned max_len=parent->PieceLength(p)-begin; if(len>max_len) len=max_len; } if(bytes_allowedSetDownloader(p,b,0,this); PacketRequest *req=new PacketRequest(p,b*Torrent::BLOCK_SIZE,len); LogSend(6,xstring::format("request piece:%u begin:%u size:%u",p,b*Torrent::BLOCK_SIZE,len)); req->Pack(send_buf); sent_queue.push(req); SetLastPiece(p); sent++; activity_timer.Reset(); bytes_allowed-=len; BytesGot(len); if(sent_queue.count()>=MAX_QUEUE_LEN) break; } return sent; } bool TorrentPeer::InFastSet(unsigned p) const { for(int i=0; i=MAX_QUEUE_LEN) return; if(!BytesAllowedToGet(Torrent::BLOCK_SIZE)) return; if(peer_choking) { // try to continue getting last piece if it is in the fast set unsigned last_piece=GetLastPiece(); if(last_piece!=NO_PIECE && InFastSet(last_piece) && SendDataRequests(last_piece)>0) return; // try fast set when choking while(fast_set.count()>0) { unsigned p=fast_set[0]; if(SendDataRequests(p)>0) return; fast_set.next(); } return; } // try to continue getting last piece if(SendDataRequests(GetLastPiece())>0) return; // try suggested pieces while(suggested_set.count()>0) { unsigned p=suggested_set.next(); if(SendDataRequests(p)>0) return; } // pick a new piece unsigned p=NO_PIECE; for(int i=0; ipieces_needed.count(); i++) { if(peer_bitfield->get_bit(parent->pieces_needed[i])) { p=parent->pieces_needed[i]; if(parent->my_bitfield->get_bit(p)) continue; // add some randomness, so that different instances don't synchronize if(parent->AllBlocksAbsent(p) && random()/13%16==0) continue; if(SendDataRequests(p)>0) return; } } if(p==NO_PIECE && interest_timer.Stopped()) SetAmInterested(false); } void TorrentPeer::Have(unsigned p) { if(!send_buf) return; Enter(); LogSend(9,xstring::format("have(%u)",p)); PacketHave(p).Pack(send_buf); Leave(); } int TorrentPeer::FindRequest(unsigned piece,unsigned begin) const { for(int i=0; iindex==piece && req->begin==begin) return i; } return -1; } void TorrentPeer::CancelBlock(unsigned p,unsigned b) { if(!send_buf) return; Enter(); int i=FindRequest(p,b); if(i>=0) { const PacketRequest *req=sent_queue[i]; LogSend(9,xstring::format("cancel(%u,%u)",p,b)); PacketCancel(p,b,req->req_length).Pack(send_buf); parent->SetDownloader(p,b/Torrent::BLOCK_SIZE,this,0); sent_queue.remove(i); } Leave(); } // mark that peer as having an invalid piece void TorrentPeer::MarkPieceInvalid(unsigned p) { invalid_piece_count++; SetPieceHaving(p,false); SetAmInterested(am_interested); if(invalid_piece_count>5) parent->BlackListPeer(this,"1d"); } void TorrentPeer::ClearSentQueue(int i) { if(i<0) return; if(!FastExtensionEnabled()) { // without Fast Extension we assume sequential packet processing, // thus clear also all sent requests before this one. while(i-->=0) { const PacketRequest *req=sent_queue.next(); parent->PeerBytesGot(-req->req_length); parent->SetDownloader(req->index,req->begin/Torrent::BLOCK_SIZE,this,0); } } else { const PacketRequest *req=sent_queue[i]; parent->PeerBytesGot(-req->req_length); parent->SetDownloader(req->index,req->begin/Torrent::BLOCK_SIZE,this,0); sent_queue.remove(i); } } int TorrentPeer::BytesAllowed(RateLimit::dir_t dir) { int a=parent->PeerBytesAllowed(this,dir); int pool_target=Torrent::BLOCK_SIZE*2; if(peer_bytes_pool[dir]a) to_pool=a; peer_bytes_pool[dir]+=to_pool; a-=to_pool; parent->PeerBytesUsed(to_pool,dir); } return peer_bytes_pool[dir]+a; } bool TorrentPeer::BytesAllowed(RateLimit::dir_t dir,unsigned bytes) { int a=BytesAllowed(dir); if(bytes<=(unsigned)a) return true; TimeoutS(1); return false; } void TorrentPeer::BytesUsed(int b,RateLimit::dir_t dir) { if(peer_bytes_pool[dir]>=b) peer_bytes_pool[dir]-=b; else { b-=peer_bytes_pool[dir]; peer_bytes_pool[dir]=0; parent->PeerBytesUsed(b,dir); } } unsigned TorrentPeer::GetLastPiece() const { if(!peer_bitfield) return NO_PIECE; unsigned p=last_piece; // continue if have any blocks already if(p!=NO_PIECE && !parent->my_bitfield->get_bit(p) && parent->AnyBlocksPresent(p) && peer_bitfield->get_bit(p)) return p; p=parent->last_piece; if(p!=NO_PIECE && !parent->my_bitfield->get_bit(p) && peer_bitfield->get_bit(p)) return p; p=last_piece; if(p!=NO_PIECE && !parent->my_bitfield->get_bit(p) && peer_bitfield->get_bit(p)) return p; return NO_PIECE; } void TorrentPeer::SetLastPiece(unsigned p) { if(last_piece==NO_PIECE || parent->my_bitfield->get_bit(last_piece)) last_piece=p; if(parent->last_piece==NO_PIECE || parent->my_bitfield->get_bit(parent->last_piece)) parent->last_piece=p; } void TorrentPeer::SetAmInterested(bool interest) { if(invalid_piece_count>5) interest=false; if(am_interested==interest) return; Enter(); LogSend(6,interest?"interested":"uninterested"); Packet(interest?MSG_INTERESTED:MSG_UNINTERESTED).Pack(send_buf); parent->am_interested_peers_count+=(interest-am_interested); am_interested=interest; interest_timer.Reset(); if(am_interested) parent->am_interested_timer.Reset(); (void)BytesAllowed(RateLimit::GET); // draw some bytes from the common pool Leave(); } void TorrentPeer::SetAmChoking(bool c) { if(am_choking==c) return; Enter(); LogSend(6,c?"choke":"unchoke"); Packet(c?MSG_CHOKE:MSG_UNCHOKE).Pack(send_buf); parent->am_not_choking_peers_count-=(c-am_choking); am_choking=c; choke_timer.Reset(); if(am_choking) { if(!FastExtensionEnabled()) { recv_queue.empty(); } else { // send rejects while(recv_queue.count()>0) { const PacketRequest *p=recv_queue.next(); LogSend(6,xstring::format("reject-request piece:%u begin:%u size:%u",p->index,p->begin,p->req_length)); PacketRejectRequest(p->index,p->begin,p->req_length).Pack(send_buf); } } } Leave(); } void TorrentPeer::SetPieceHaving(unsigned p,bool have) { int diff = (have - peer_bitfield->get_bit(p)); if(!diff) return; parent->piece_info[p].add_sources_count(diff); peer_complete_pieces+=diff; peer_bitfield->set_bit(p,have); if(parent->piece_info[p].get_sources_count()==0) parent->SetPieceNotWanted(p); if(have && send_buf && !am_interested && !parent->my_bitfield->get_bit(p) && parent->NeedMoreUploaders()) { SetAmInterested(true); SetLastPiece(p); } } void TorrentPeer::HandlePacket(Packet *p) { switch(p->GetPacketType()) { case MSG_KEEPALIVE: { LogRecv(5,"keep-alive"); break; } case MSG_CHOKE: { LogRecv(5,"choke"); peer_choking=true; ClearSentQueue(); // discard pending requests break; } case MSG_UNCHOKE: { LogRecv(5,"unchoke"); peer_choking=false; if(am_interested) SendDataRequests(); break; } case MSG_INTERESTED: { LogRecv(5,"interested"); peer_interested=true; break; } case MSG_UNINTERESTED: { LogRecv(5,"uninterested"); recv_queue.empty(); peer_interested=false; break; } case MSG_HAVE: { if(!parent->HasMetadata()) break; PacketHave *pp=static_cast(p); LogRecv(5,xstring::format("have(%u)",pp->piece)); if(!parent->HasMetadata()) break; if(pp->piece>=parent->total_pieces) { SetError("invalid piece index"); break; } SetPieceHaving(pp->piece,true); break; } case MSG_BITFIELD: { if(!parent->HasMetadata()) break; PacketBitField *pp=static_cast(p); if(pp->bitfield->count()<(int)parent->total_pieces/8) { LogError(9,"bitfield length %d, expected %u",pp->bitfield->count(),parent->total_pieces/8); SetError("invalid bitfield length"); break; } if(pp->bitfield->has_any_set(parent->total_pieces,pp->bitfield->get_bit_length())) { SetError("bitfield has spare bits set"); break; } for(unsigned p=0; ptotal_pieces; p++) SetPieceHaving(p,pp->bitfield->get_bit(p)); LogRecv(5,xstring::format("bitfield(%u/%u)",peer_complete_pieces,parent->total_pieces)); break; } case MSG_PORT: { PacketPort *pp=static_cast(p); LogRecv(5,xstring::format("port(%u)",pp->port)); udp_port=pp->port; if(DHT_Enabled() && Torrent::dht) { sockaddr_u a(addr); a.set_port(udp_port); Torrent::GetDHT(a)->SendPing(a); } break; } case MSG_HAVE_ALL: { LogRecv(5,"have-all"); if(!FastExtensionEnabled()) { SetError("fast extension is disabled"); break; } if(!parent->HasMetadata()) break; for(unsigned p=0; ptotal_pieces; p++) SetPieceHaving(p,1); break; } case MSG_HAVE_NONE: { LogRecv(5,"have-none"); if(!FastExtensionEnabled()) { SetError("fast extension is disabled"); break; } if(!parent->HasMetadata()) break; for(unsigned p=0; ptotal_pieces; p++) SetPieceHaving(p,0); break; } case MSG_SUGGEST_PIECE: { PacketSuggestPiece *pp=static_cast(p); LogRecv(5,xstring::format("suggest-piece:%u",pp->piece)); if(!FastExtensionEnabled()) { SetError("fast extension is disabled"); break; } if(!parent->HasMetadata()) break; if(pp->piece>=parent->total_pieces) { SetError("invalid piece index"); break; } suggested_set.push(pp->piece); break; } case MSG_ALLOWED_FAST: { PacketAllowedFast *pp=static_cast(p); LogRecv(5,xstring::format("allowed-fast:%u",pp->piece)); if(!FastExtensionEnabled()) { SetError("fast extension is disabled"); break; } if(!parent->HasMetadata()) break; if(pp->piece>=parent->total_pieces) { SetError("invalid piece index"); break; } fast_set.push(pp->piece); break; } case MSG_REJECT_REQUEST: { PacketRejectRequest *pp=static_cast(p); LogRecv(5,xstring::format("reject-request(%u,%u)",pp->index,pp->begin)); if(!FastExtensionEnabled()) { SetError("fast extension is disabled"); break; } int i=FindRequest(pp->index,pp->begin); if(i>=0) ClearSentQueue(i); break; } case MSG_EXTENDED: { PacketExtended *pp=static_cast(p); LogRecv(9,xstring::format("extended(%u,%s)",pp->code,pp->data->Format1())); HandleExtendedMessage(pp); break; } case MSG_PIECE: { PacketPiece *pp=static_cast(p); LogRecv(7,xstring::format("piece:%u begin:%u size:%u",pp->index,pp->begin,(unsigned)pp->data.length())); if(!parent->HasMetadata()) break; if(pp->index>=parent->total_pieces) { SetError("invalid piece index"); break; } if(pp->begin>=parent->PieceLength(pp->index)) { SetError("invalid data offset"); break; } if(pp->begin+pp->data.length() > parent->PieceLength(pp->index)) { SetError("invalid data length"); break; } int i=FindRequest(pp->index,pp->begin); if(i<0) { // SetError("got a piece that was not requested"); break; } ClearSentQueue(i); parent->PeerBytesGot(pp->data.length()); // re-take the bytes returned by ClearSentQueue Enter(parent); parent->StoreBlock(pp->index,pp->begin,pp->data.length(),pp->data.get(),this); Leave(parent); int len=pp->data.length(); peer_recv+=len; peer_recv_rate.Add(len); parent->AccountRecv(pp->index,len); // request another block from the same piece if(am_interested && (!peer_choking || InFastSet(pp->index))) SendDataRequests(pp->index); break; } case MSG_REQUEST: { PacketRequest *pp=static_cast(p); LogRecv(5,xstring::format("request for piece:%u begin:%u size:%u",pp->index,pp->begin,pp->req_length)); if(pp->req_length>Torrent::BLOCK_SIZE*2) { SetError("too large request"); break; } if(!parent->HasMetadata()) break; if(am_choking) break; if(pp->index>=parent->total_pieces) { SetError("invalid piece index"); break; } if(pp->begin>=parent->PieceLength(pp->index)) { SetError("invalid data offset"); break; } if(pp->begin+pp->req_length > parent->PieceLength(pp->index)) { SetError("invalid data length"); break; } if(recv_queue.count()>=MAX_QUEUE_LEN*16) { SetError("too many requests"); break; } recv_queue.push(pp); activity_timer.Reset(); p=0; break; } case MSG_CANCEL: { PacketCancel *pp=static_cast(p); LogRecv(5,xstring::format("cancel(%u,%u)",pp->index,pp->begin)); for(int i=0; iindex==pp->index && req->begin==pp->begin) { recv_queue.remove(i); break; } } break; } } delete p; } void Torrent::MetadataDownloaded() { xstring new_info_hash; SHA1(md_download,new_info_hash); if(info_hash && info_hash.ne(new_info_hash)) { LogError(1,"downloaded metadata does not match info_hash, retrying"); StartMetadataDownload(); return; } if(SetMetadata(md_download)) Startup(); md_download.unset(); } void TorrentPeer::SendMetadataRequest() { if(!msg_ext_metadata || !parent->md_download || parent->md_download.length()>=metadata_size || parent->md_download.length()%Torrent::BLOCK_SIZE) return; xmap_p req; req.add("msg_type",new BeNode(UT_METADATA_REQUEST)); req.add("piece",new BeNode(parent->md_download.length()/Torrent::BLOCK_SIZE)); PacketExtended pkt(msg_ext_metadata,new BeNode(&req)); LogSend(4,xstring::format("ut_metadata request %s",pkt.data->Format1())); pkt.Pack(send_buf); } void TorrentPeer::HandleExtendedMessage(PacketExtended *pp) { if(pp->data->type!=BeNode::BE_DICT) { SetError("extended type must be DICT"); return; } if(pp->code==MSG_EXT_HANDSHAKE) { BeNode *m=pp->data->lookup("m",BeNode::BE_DICT); if(m) { msg_ext_metadata=m->lookup_int("ut_metadata"); msg_ext_pex=m->lookup_int("ut_pex"); } metadata_size=parent->metadata_size=pp->data->lookup_int("metadata_size"); upload_only=pp->data->lookup_int("upload_only"); if(!parent->HasMetadata() && !msg_ext_metadata) { Disconnect("peer cannot provide metadata"); return; } const xstring& v=pp->data->lookup_str("v"); if(v) LogNote(3,"peer version is %s",v.get()); const xstring& myip=pp->data->lookup_str("yourip"); if(myip && myip.length()==4) { char ip[16]; inet_ntop(AF_INET,myip.get(),ip,sizeof(ip)); LogNote(5,"my external IPv4 is %s",ip); } if(passive) { // use specified port number to connect back int p=pp->data->lookup_int("p"); if(p && p>=1024 && p<=65535) { LogNote(9,"using port %d to connect back",p); addr.set_port(p); passive=false; // check the black list if(Torrent::BlackListed(this)) { SetError("blacklisted"); return; } // check for duplicates TaskRefArray& peers=parent->peers; for(int i=0; iAddressEq(this)) { if(!peers[i]->Connected()) peers[i]->duplicate=this; else duplicate=peers[i].get_non_const(); return; } } } } if(msg_ext_metadata && parent->md_download) SendMetadataRequest(); } else if(pp->code==MSG_EXT_METADATA) { BeNode *msg_type=pp->data->lookup("msg_type",BeNode::BE_INT); if(!msg_type) { SetError("ut_metadata msg_type bad or missing"); return; } BeNode *piece=pp->data->lookup("piece",BeNode::BE_INT); if(!piece || piece->num<0 || piece->num>=INT_MAX/Torrent::BLOCK_SIZE) { SetError("ut_metadata piece bad or missing"); return; } size_t offset=piece->num*Torrent::BLOCK_SIZE; xmap_p reply; switch(msg_type->num) { case UT_METADATA_REQUEST: { if(offset>parent->metadata.length()) { reply.add("msg_type",new BeNode(UT_METADATA_REJECT)); reply.add("piece",new BeNode(piece->num)); PacketExtended pkt(msg_ext_metadata,new BeNode(&reply)); LogSend(4,xstring::format("ut_metadata reject %s",pkt.data->Format1())); pkt.Pack(send_buf); break; } const char *d=parent->metadata+offset; unsigned len=parent->metadata.length()-offset; if(len>Torrent::BLOCK_SIZE) len=Torrent::BLOCK_SIZE; reply.add("msg_type",new BeNode(UT_METADATA_DATA)); reply.add("piece",new BeNode(piece->num)); reply.add("total_size",new BeNode(parent->metadata.length())); PacketExtended pkt(msg_ext_metadata,new BeNode(&reply)); LogSend(4,xstring::format("ut_metadata data %s",pkt.data->Format1())); pkt.SetAppendix(d,len); pkt.Pack(send_buf); break; } case UT_METADATA_DATA: { if(parent->md_download) { if(offset==parent->md_download.length()) { BeNode *b_size=pp->data->lookup("total_size",BeNode::BE_INT); if(b_size) { if(metadata_size && metadata_size!=(size_t)b_size->num) { SetError("metadata_size mismatch with total_size"); return; } metadata_size=b_size->num; parent->metadata_size=metadata_size; } parent->md_download.append(pp->appendix); if(pp->appendix.length()MetadataDownloaded(); } SendMetadataRequest(); } break; } case UT_METADATA_REJECT: break; default: SetError("ut_metadata msg_type invalid value"); return; } } else if(pp->code==MSG_EXT_PEX) { if(!pex.recv_timer.Stopped()) return; pex.recv_timer.Reset(); BeNode *added=pp->data->lookup("added",BeNode::BE_STR); BeNode *added6=pp->data->lookup("added6",BeNode::BE_STR); BeNode *added_f=pp->data->lookup("added.f",BeNode::BE_STR); BeNode *added6_f=pp->data->lookup("added6.f",BeNode::BE_STR); AddPEXPeers(added,added_f,6); AddPEXPeers(added6,added6_f,18); } } void TorrentPeer::AddPEXPeers(BeNode *added,BeNode *added_f,int addr_size) { if(!added) return; const char *data=added->str; unsigned n=added->str.length()/addr_size; if(n>50) n=50; const char *flags=0; if(added_f && added_f->str.length()==n) flags=added_f->str; int peers_count=0; for(unsigned i=0; iComplete() && (f&pex.SEED)) continue; sockaddr_u a; a.set_compact(data,addr_size); if(!a.is_compatible(this->addr)) continue; parent->AddPeer(new TorrentPeer(parent,&a,TR_PEX)); peers_count++; } if(peers_count>0) LogNote(4,"%d %s peers added from PEX message",peers_count,addr_size==6?"ipv4":"ipv6"); } void TorrentPeer::SendPEXPeers() { pex.send_timer.Reset(); if(!msg_ext_pex || parent->Private()) return; xmap old_sent; old_sent.move_here(pex.sent); int peer_count=0; xstring added; xstring added6; xstring added_f; xstring added6_f; xstring dropped; xstring dropped6; int a=0,a6=0,d=0,d6=0; for(int i=parent->peers.count(); i>0; i--) { const TorrentPeer *peer=parent->peers[i-1]; if(!peer->Connected() || peer->IsPassive() || peer->Failed() || !peer->addr.is_compatible(this->addr) || peer==this || peer->myself) continue; const xstring& ca=peer->addr.compact(); if(old_sent.exists(ca)) { old_sent.remove(ca); continue; } unsigned char f=pex.CONNECTABLE; if(peer->Seed()) f|=pex.SEED; peer_count++; if(peer_count>50) continue; if(ca.length()==6) { added.append(ca); added_f.append(f); a++; } else { added6.append(ca); added6_f.append(f); a6++; } pex.sent.add(ca,f); } peer_count=0; for(old_sent.each_begin(); !old_sent.each_finished(); old_sent.each_next()) { const xstring& ca=old_sent.each_key(); peer_count++; if(peer_count>50) { // drop it later pex.sent.add(ca,0); continue; } if(ca.length()==6) { dropped.append(ca); d++; } else { dropped6.append(ca); d6++; } } if(a+a6+d+d6==0) return; xmap_p req; if(a) { req.add("added",new BeNode(added)); req.add("added.f",new BeNode(added_f)); } if(a6) { req.add("added6",new BeNode(added6)); req.add("added6.f",new BeNode(added6_f)); } if(d) req.add("dropped",new BeNode(dropped)); if(d6) req.add("dropped6",new BeNode(dropped6)); PacketExtended pkt(msg_ext_pex,new BeNode(&req)); LogSend(4,xstring::format("ut_pex message: added=[%d,%d], dropped=[%d,%d]",a,a6,d,d6)); pkt.Pack(send_buf); } bool TorrentPeer::HasNeededPieces() { if(!peer_bitfield) return false; if(GetLastPiece()!=NO_PIECE) return true; for(int i=0; ipieces_needed.count(); i++) if(peer_bitfield->get_bit(parent->pieces_needed[i])) return true; return false; } int TorrentPeer::Do() { int m=STALL; if(error || myself) return m; if(sock==-1) { if(passive) return m; if(!retry_timer.Stopped()) return m; if(parent->IsValidating()) return m; sock=SocketCreateTCP(addr.sa.sa_family,0); if(sock==-1) { if(NonFatalError(errno)) return m; SetError(xstring::format(_("cannot create socket of address family %d"),addr.sa.sa_family)); return MOVED; } LogNote(4,_("Connecting to peer %s port %u"),SocketNumericAddress(&addr),SocketPort(&addr)); connected=false; } if(!connected) { int res=SocketConnect(sock,&addr); if(res==-1 && errno!=EINPROGRESS && errno!=EALREADY && errno!=EISCONN) { int e=errno; const char *error=strerror(e); LogError(4,"connect(%s): %s\n",GetName(),error); Disconnect(error); if(FA::NotSerious(e) && !ActivityTimedOut()) return MOVED; SetError(error); return MOVED; } if(res==-1 && errno!=EISCONN) { Block(sock,POLLOUT); return m; } connected=true; timeout_timer.Reset(); m=MOVED; } if(!recv_buf) { recv_buf=new IOBufferFDStream(new FDStream(sock,""),IOBuffer::GET); } if(!send_buf) { send_buf=new IOBufferFDStream(new FDStream(sock,""),IOBuffer::PUT); SendHandshake(); } if(send_buf->Error()) { LogError(2,"send: %s",send_buf->ErrorText()); Disconnect(send_buf->ErrorText()); return MOVED; } if(recv_buf->Error()) { LogError(2,"receive: %s",recv_buf->ErrorText()); Disconnect(recv_buf->ErrorText()); return MOVED; } if(!peer_id) { // expect handshake unpack_status_t s=RecvHandshake(); if(s==UNPACK_NO_DATA_YET) return m; if(s!=UNPACK_SUCCESS) { if(s==UNPACK_PREMATURE_EOF) { if(recv_buf->Size()>0) { LogError(2,_("peer unexpectedly closed connection after %s"),recv_buf->Dump()); Disconnect(_("peer unexpectedly closed connection")); } else { LogError(4,_("peer closed connection (before handshake)")); Disconnect(_("peer closed connection (before handshake)")); } } else { Disconnect(_("invalid peer response format")); } return MOVED; } if(!parent->HasMetadata() && !LTEPExtensionEnabled()) { Disconnect("peer cannot provide metadata"); return MOVED; } timeout_timer.Reset(); myself=peer_id.eq(Torrent::my_peer_id); if(myself) return MOVED; SendExtensions(); if(parent->HasMetadata()) peer_bitfield=new BitField(parent->total_pieces); if(FastExtensionEnabled()) { if(parent->complete_pieces==0) { LogSend(5,"have-none"); Packet(MSG_HAVE_NONE).Pack(send_buf); } else if(parent->complete_pieces==parent->total_pieces) { LogSend(5,"have-all"); Packet(MSG_HAVE_ALL).Pack(send_buf); } else { LogSend(5,"bitfield"); PacketBitField(parent->my_bitfield).Pack(send_buf); } } else if(parent->my_bitfield && parent->my_bitfield->has_any_set()) { LogSend(5,"bitfield"); PacketBitField(parent->my_bitfield).Pack(send_buf); } if(Torrent::listener_udp && DHT_Enabled()) { int udp_port=Torrent::listener_udp->GetPort(); #if INET6 if(Torrent::listener_ipv6_udp && addr.sa.sa_family==AF_INET6) udp_port=Torrent::listener_ipv6_udp->GetPort(); #endif if(udp_port) { LogSend(5,xstring::format("port(%d)",udp_port)); PacketPort(udp_port).Pack(send_buf); } } keepalive_timer.Reset(); } if(keepalive_timer.Stopped()) { LogSend(5,"keep-alive"); Packet(MSG_KEEPALIVE).Pack(send_buf); keepalive_timer.Reset(); } if(send_buf->Size()>(int)Torrent::BLOCK_SIZE*4) recv_buf->Suspend(); else recv_buf->Resume(); if(recv_buf->IsSuspended()) return m; timeout_timer.Reset(send_buf->EventTime()); timeout_timer.Reset(recv_buf->EventTime()); if(timeout_timer.Stopped()) { LogError(0,_("Timeout - reconnecting")); Disconnect("timed out"); return MOVED; } if(!am_interested && interest_timer.Stopped() && HasNeededPieces() && parent->NeedMoreUploaders()) SetAmInterested(true); if(am_interested && sent_queue.count()AllowMoreDownloaders()) SetAmChoking(false); if(recv_queue.count()>0 && send_buf->Size()<(int)Torrent::BLOCK_SIZE*2) { unsigned bytes_allowed=BytesAllowed(RateLimit::PUT); while(bytes_allowed>=recv_queue[0]->req_length) { bytes_allowed-=recv_queue[0]->req_length; SendDataReply(); m=MOVED; if(!Connected()) return m; if(recv_queue.count()==0) break; if(send_buf->Size()>=(int)Torrent::BLOCK_SIZE) m|=send_buf->Do(); if(send_buf->Size()>=(int)Torrent::BLOCK_SIZE*2) break; } } if(recv_buf->Eof() && recv_buf->Size()==0) { LogError(4,_("peer closed connection")); Disconnect(_("peer closed connection")); return MOVED; } if(pex.send_timer.Stopped()) SendPEXPeers(); Packet *reply=0; unpack_status_t st=UnpackPacket(recv_buf,&reply); if(st==UNPACK_NO_DATA_YET) return m; if(st!=UNPACK_SUCCESS) { if(st==UNPACK_PREMATURE_EOF) { LogError(2,_("peer unexpectedly closed connection after %s"),recv_buf->Dump()); Disconnect(_("peer unexpectedly closed connection")); } else { LogError(2,_("invalid peer response format")); Disconnect(_("invalid peer response format")); } return MOVED; } reply->DropData(recv_buf); HandlePacket(reply); return MOVED; } TorrentPeer::unpack_status_t TorrentPeer::UnpackPacket(SMTaskRef& b,TorrentPeer::Packet **p) { Packet *&pp=*p; pp=0; Ref probe(new Packet); unpack_status_t res=probe->Unpack(b); if(res!=UNPACK_SUCCESS) return res; LogRecvF(11,"got a packet, length=%d, type=%d(%s)\n", probe->GetLength(),probe->GetPacketType(),probe->GetPacketTypeText()); switch(probe->GetPacketType()) { case MSG_KEEPALIVE: case MSG_CHOKE: case MSG_UNCHOKE: case MSG_INTERESTED: case MSG_UNINTERESTED: case MSG_HAVE_ALL: case MSG_HAVE_NONE: pp=probe.borrow(); break; case MSG_HAVE: pp=new PacketHave(); break; case MSG_BITFIELD: pp=new PacketBitField(); break; case MSG_REQUEST: pp=new PacketRequest(); break; case MSG_PIECE: pp=new PacketPiece(); break; case MSG_CANCEL: pp=new PacketCancel(); break; case MSG_PORT: pp=new PacketPort(); break; case MSG_SUGGEST_PIECE: pp=new PacketSuggestPiece(); break; case MSG_ALLOWED_FAST: pp=new PacketAllowedFast(); break; case MSG_REJECT_REQUEST: pp=new PacketRejectRequest(); break; case MSG_EXTENDED: pp=new PacketExtended(); break; } if(probe) res=pp->Unpack(b); if(res!=UNPACK_SUCCESS) { switch(res) { case UNPACK_PREMATURE_EOF: LogError(0,"premature eof"); break; case UNPACK_WRONG_FORMAT: LogError(0,"wrong packet format"); break; case UNPACK_NO_DATA_YET: case UNPACK_SUCCESS: ; } if(probe) probe->DropData(b); else pp->DropData(b); delete pp; pp=0; } return res; } const char *TorrentPeer::Packet::GetPacketTypeText() const { const char *const text_table[]={ "keep-alive", "choke", "unchoke", "interested", "uninterested", "have", "bitfield", "request", "piece", "cancel", "port", "10", "11", "12", "suggest-piece", "have-all", "have-none", "reject-request", "allowed-fast", "18", "19", "extended", }; return text_table[type+1]; } TorrentPeer::unpack_status_t TorrentPeer::Packet::Unpack(const Buffer *b) { unpacked=0; if(b->Size()<4) return b->Eof()?UNPACK_PREMATURE_EOF:UNPACK_NO_DATA_YET; length=b->UnpackUINT32BE(0); unpacked+=4; if(length==0) { type=MSG_KEEPALIVE; return UNPACK_SUCCESS; } if(length<0 || length>1024*1024) { LogError(4,"invalid length %d",length); return UNPACK_WRONG_FORMAT; } if(b->Size()Eof()?UNPACK_PREMATURE_EOF:UNPACK_NO_DATA_YET; int t=b->UnpackUINT8(4); unpacked++; if(!is_valid_reply(t)) { LogError(4,"unknown packet type %d, length %d",t,length); return UNPACK_WRONG_FORMAT; } type=(packet_type)t; return UNPACK_SUCCESS; } bool TorrentPeer::AddressEq(const TorrentPeer *o) const { return !memcmp(&addr,&o->addr,sizeof(addr)); } const char *TorrentPeer::GetName() const { xstring& name=xstring::format("[%s]:%d",addr.address(),addr.port()); if(tracker_no==TR_ACCEPTED) name.append("/A"); else if(tracker_no==TR_DHT) name.append("/D"); else if(tracker_no==TR_PEX) name.append("/X"); else if(parent->trackers.count()>1) name.appendf("/%d",tracker_no+1); return name; } const char *TorrentPeer::Status() { if(sock==-1) { if(last_dc) return xstring::format("Disconnected (%s)",last_dc.get()); return _("Not connected"); } if(!connected) return _("Connecting..."); if(!peer_id) return _("Handshaking..."); xstring &buf=xstring::format("dn:%s %sup:%s %s", xhuman(peer_recv),peer_recv_rate.GetStrS(), xhuman(peer_sent),peer_send_rate.GetStrS()); if(peer_interested) buf.append("peer-interested "); if(peer_choking) buf.append("peer-choking "); if(am_interested) buf.append("am-interested "); if(am_choking) buf.append("am-choking "); if(parent->HasMetadata()) { if(peer_complete_piecestotal_pieces) buf.appendf("complete:%u/%u (%u%%)",peer_complete_pieces,parent->total_pieces, peer_complete_pieces*100/parent->total_pieces); else buf.append("complete"); } return buf; } TorrentPeer::Packet::Packet(packet_type t) { type=t; length=0; if(type>=0) length+=1; } void TorrentPeer::Packet::Pack(SMTaskRef& b) { b->PackUINT32BE(length); if(type>=0) b->PackUINT8(type); } TorrentPeer::PacketBitField::PacketBitField(const BitField *bf) : Packet(MSG_BITFIELD) { bitfield=new BitField(); bitfield->set(*bf); length+=bitfield->count(); } TorrentPeer::PacketBitField::~PacketBitField() { } TorrentPeer::unpack_status_t TorrentPeer::PacketBitField::Unpack(const Buffer *b) { unpack_status_t res; res=Packet::Unpack(b); if(res!=UNPACK_SUCCESS) return res; int bytes=length+4-unpacked; bitfield=new BitField(bytes*8); memcpy(bitfield->get_non_const(),b->Get()+unpacked,bytes); unpacked+=bytes; return UNPACK_SUCCESS; } void TorrentPeer::PacketBitField::ComputeLength() { Packet::ComputeLength(); length+=bitfield->count(); } void TorrentPeer::PacketBitField::Pack(SMTaskRef& b) { Packet::Pack(b); b->Put((const char*)(bitfield->get()),bitfield->count()); } TorrentPeer::_PacketIBL::_PacketIBL(packet_type t,unsigned i,unsigned b,unsigned l) : Packet(t), index(i), begin(b), req_length(l) { length+=12; } TorrentPeer::unpack_status_t TorrentPeer::_PacketIBL::Unpack(const Buffer *b) { unpack_status_t res; res=Packet::Unpack(b); if(res!=UNPACK_SUCCESS) return res; index=b->UnpackUINT32BE(unpacked);unpacked+=4; begin=b->UnpackUINT32BE(unpacked);unpacked+=4; req_length=b->UnpackUINT32BE(unpacked);unpacked+=4; return UNPACK_SUCCESS; } void TorrentPeer::_PacketIBL::ComputeLength() { Packet::ComputeLength(); length+=12; } void TorrentPeer::_PacketIBL::Pack(SMTaskRef& b) { Packet::Pack(b); b->PackUINT32BE(index); b->PackUINT32BE(begin); b->PackUINT32BE(req_length); } TorrentPeer::unpack_status_t TorrentPeer::Packet::UnpackBencoded(const Buffer *b,int *offset,int limit,Ref *out) { assert(limit<=b->Size()); int rest=limit-*offset; int rest0=rest; *out=BeNode::Parse(b->Get()+*offset,rest,&rest); if(!*out) { if(rest>0) return UNPACK_WRONG_FORMAT; return b->Eof()?UNPACK_PREMATURE_EOF:UNPACK_NO_DATA_YET; } *offset+=(rest0-rest); return UNPACK_SUCCESS; } BitField::BitField(int bits) { bit_length=bits; int bytes=(bits+7)/8; get_space(bytes); memset(buf,0,bytes); set_length(bytes); } bool BitField::get_bit(int i) const { return (*this)[i/8]&(0x80>>(i%8)); } void BitField::set_bit(int i,bool value) { unsigned char &b=(*this)[i/8]; int v=(0x80>>(i%8)); if(value) b|=v; else b&=~v; } bool BitField::has_any_set(int from,int to) const { for(int i=from; iStopped()) { LogNote(4,"black-delisting peer %s\n",bl.each_key().get()); bl.remove(bl.each_key()); } } } void TorrentBlackList::Add(const sockaddr_u &a,const char *t) { check_expire(); if(Listed(a)) return; LogNote(4,"black-listing peer %s (%s)\n",(const char*)a,t); bl.add(a.to_xstring(),new Timer(TimeIntervalR(t))); } bool TorrentBlackList::Listed(const sockaddr_u &a) { return bl.lookup(a.to_xstring())!=0; } TorrentListener::TorrentListener(int a,int t) : af(a), type(t), sock(-1), last_sent_udp_count(0) { } TorrentListener::~TorrentListener() { if(sock!=-1) close(sock); } void TorrentListener::FillAddress(int port) { addr.set_defaults(af,"torrent",port); } bool Torrent::NoTorrentCanAccept() { for(const Torrent *t=torrents.each_begin(); t; t=torrents.each_next()) { if(t->CanAccept()) return false; } return true; } int TorrentListener::Do() { int m=STALL; if(error) return m; if(sock==-1) { int proto=(type==SOCK_STREAM?IPPROTO_TCP:IPPROTO_UDP); sock=SocketCreateUnbound(af,type,proto,0); if(sock==-1) { if(NonFatalError(errno)) return m; error=Error::Fatal(_("cannot create socket of address family %d"),addr.sa.sa_family); return MOVED; } SocketSinglePF(sock,af); // Try to assign a port from given range Range range(ResMgr::Query("torrent:port-range",0)); // but first try already allocated port int prefer_port=Torrent::GetPort(); if(prefer_port) { ReuseAddress(sock); // try to reuse address. FillAddress(prefer_port); if(addr.bind_to(sock)==0) goto bound; LogError(1,"bind(%s): %s",addr.to_string(),strerror(errno)); } for(int t=0; ; t++) { if(t>=10) { close(sock); sock=-1; TimeoutS(1); // retry later. return m; } if(t==9) ReuseAddress(sock); // try to reuse address. int port=0; if(!range.IsFull()) port=range.Random(); if(!port && type==SOCK_DGRAM) port=Range("1024-65535").Random(); if(!port) break; // nothing to bind FillAddress(port); if(addr.bind_to(sock)==0) break; int saved_errno=errno; // Fail unless socket was already taken if(errno!=EINVAL && errno!=EADDRINUSE) { LogError(0,"bind(%s): %s",addr.to_string(),strerror(saved_errno)); close(sock); sock=-1; if(NonFatalError(errno)) { TimeoutS(1); return m; } error=Error::Fatal(_("Cannot bind a socket for torrent:port-range")); return MOVED; } LogError(10,"bind(%s): %s",addr.to_string(),strerror(saved_errno)); } bound: if(type==SOCK_STREAM) listen(sock,5); // get the allocated port socklen_t addr_len=sizeof(addr); getsockname(sock,&addr.sa,&addr_len); LogNote(4,"listening on %s %s",type==SOCK_STREAM?"tcp":"udp",addr.to_string()); m=MOVED; if(type==SOCK_DGRAM && Torrent::dht) Torrent::GetDHT(af)->Load(); } if(type==SOCK_DGRAM) { if(!Ready(sock,POLLIN)) { Block(sock,POLLIN); return m; } char buf[0x4000]; sockaddr_u src; socklen_t src_len=sizeof(src); int res=recvfrom(sock,buf,sizeof(buf),0,&src.sa,&src_len); if(res==-1) { if(!E_RETRY(errno)) LogError(9,"recvfrom: %s",strerror(errno)); Block(sock,POLLIN); return m; } if(res==0) return MOVED; rate.Add(1); Torrent::DispatchUDP(buf,res,src); return MOVED; } if(rate.Get()>5 || Torrent::NoTorrentCanAccept()) { TimeoutS(1); return m; } if(!Ready(sock,POLLIN)) { Block(sock,POLLIN); return m; } sockaddr_u remote_addr; int a=SocketAccept(sock,&remote_addr); if(a==-1) { Block(sock,POLLIN); return m; } rate.Add(1); LogNote(3,_("Accepted connection from [%s]:%d"),remote_addr.address(),remote_addr.port()); (void)new TorrentDispatcher(a,&remote_addr); m=MOVED; return m; } bool TorrentListener::MaySendUDP() { // limit udp rate if(last_sent_udp_count>=10 && now==last_sent_udp) UpdateNow(); TimeDiff time_passed(now,last_sent_udp); if(time_passed.MilliSeconds()<1) { if(last_sent_udp_count>=10) { Timeout(1); return false; } last_sent_udp_count++; } else { last_sent_udp_count=0; last_sent_udp=now; } // check if output buffer is available struct pollfd pfd; pfd.fd=sock; pfd.events=POLLOUT; pfd.revents=0; int res=poll(&pfd,1,0); if(res>0) return true; Block(sock,POLLOUT); return false; } int TorrentListener::SendUDP(const sockaddr_u& a,const xstring& buf) { int res=sendto(sock,buf,buf.length(),0,&a.sa,a.addr_len()); if(res==-1) LogError(0,"sendto(%s): %s",a.to_string(),strerror(errno)); return res; } void Torrent::DispatchUDP(const char *buf,int len,const sockaddr_u& src) { int rest; if(buf[0]=='d' && buf[len-1]=='e' && dht) { Ref msg(BeNode::Parse(buf,len,&rest)); if(!msg) goto unknown; const SMTaskRef &d=Torrent::GetDHT(src); d->Enter(); d->HandlePacket(msg.get_non_const(),src); d->Leave(); } else if(buf[0]==0x41) { LogRecv(9,xstring::format("uTP SYN v1 from %s {%s}",src.to_string(),xstring::get_tmp(buf,len).hexdump())); } else { unknown: LogRecv(4,xstring::format("udp from %s {%s}",src.to_string(),xstring::get_tmp(buf,len).hexdump())); } } void Torrent::Dispatch(const xstring& info_hash,int sock,const sockaddr_u *remote_addr,IOBuffer *recv_buf) { Torrent *t=FindTorrent(info_hash); if(!t) { LogError(3,_("peer sent unknown info_hash=%s in handshake"),info_hash.hexdump()); close(sock); Delete(recv_buf); return; } t->Accept(sock,remote_addr,recv_buf); } TorrentDispatcher::TorrentDispatcher(int s,const sockaddr_u *a) : sock(s), addr(*a), recv_buf(new IOBufferFDStream(new FDStream(sock,""),IOBuffer::GET)), timeout_timer(60), peer_name(addr.to_xstring()) { } TorrentDispatcher::~TorrentDispatcher() { if(sock!=-1) close(sock); } int TorrentDispatcher::Do() { if(timeout_timer.Stopped()) { LogError(1,_("peer handshake timeout")); Delete(this); return MOVED; } unsigned proto_len=0; if(recv_buf->Size()>0) proto_len=recv_buf->UnpackUINT8(); if((unsigned)recv_buf->Size()<1+proto_len+8+SHA1_DIGEST_SIZE) { if(recv_buf->Eof()) { if(recv_buf->Size()>0) LogError(1,_("peer short handshake")); else LogError(4,_("peer closed just accepted connection")); Delete(this); return MOVED; } return STALL; } int unpacked=1; const char *data=recv_buf->Get(); unpacked+=proto_len; unpacked+=8; // 8 bytes are reserved xstring peer_info_hash(data+unpacked,SHA1_DIGEST_SIZE); unpacked+=SHA1_DIGEST_SIZE; Torrent::Dispatch(peer_info_hash,sock,&addr,recv_buf.borrow()); sock=-1; Delete(this); return MOVED; } /// TorrentJob::TorrentJob(Torrent *t) : torrent(t), completed(false), done(false) { } TorrentJob::~TorrentJob() { } void TorrentJob::PrepareToDie() { done=true; torrent=0; Job::PrepareToDie(); } int TorrentJob::Do() { if(done) return STALL; if(torrent->Done()) { done=true; const Error *e=torrent->GetInvalidCause(); if(e) eprintf("%s\n",e->Text()); return MOVED; } if(!completed && torrent->Complete()) { if(parent->WaitsFor(this) && !torrent->IsSharing()) { PrintStatus(1,""); printf(_("Seeding in background...\n")); parent->RemoveWaiting(this); } completed=true; return MOVED; } return STALL; } xstring& TorrentJob::FormatStatus(xstring& s,int v,const char *tab) { if(torrent->IsDownloading()) torrent->CalcPiecesStats(); const char *name=torrent->GetName(); if(name) s.appendf("%sName: %s\n",tab,name); const char *status=torrent->Status(); if(*status) s.appendf("%s%s\n",tab,status); if(torrent->IsDownloading()) { s.appendf("%spiece availability: min %u, avg %.2f, %d%% available\n",tab, torrent->MinPieceSources(),torrent->AvgPieceSources(),torrent->PiecesAvailablePct()); if(torrent->GetRatio()>0) { s.appendf("%sratio: %.2f/%.2f/%.2f\n",tab, torrent->GetMinPerPieceRatio(),torrent->GetRatio(), torrent->GetMaxPerPieceRatio()); } } if(v>2) { s.appendf("%sinfo hash: %s\n",tab,torrent->GetInfoHash().hexdump()); if(torrent->HasMetadata()) { s.appendf("%stotal length: %llu\n",tab,torrent->TotalLength()); s.appendf("%spiece length: %u\n",tab,torrent->PieceLength()); } } if(v>1) { if(torrent->Trackers().count()==1) { s.appendf("%stracker: %s - %s\n",tab,torrent->Trackers()[0]->GetURL(), torrent->Trackers()[0]->Status()); } else if(torrent->Trackers().count()>1) { s.appendf("%strackers:\n",tab); for(int i=0; iTrackers().count(); i++) { s.appendf("%s%2d. %s - %s\n",tab,i+1,torrent->Trackers()[i]->GetURL(), torrent->Trackers()[i]->Status()); } } const char *dht_status=torrent->DHT_Status(); if(*dht_status) s.appendf("%sDHT: %s\n",tab,dht_status); } if(torrent->ShuttingDown()) return s; if(torrent->GetPeersCount()<=5 || v>1) { const TaskRefArray& peers=torrent->GetPeers(); int not_connected_peers=torrent->GetPeersCount()-torrent->GetConnectedPeersCount(); if(v<=2 && not_connected_peers>0) s.appendf("%s not connected peers: %d\n",tab,not_connected_peers); for(int i=0; iConnected() || v>2) s.appendf("%s %s: %s\n",tab,peers[i]->GetName(),peers[i]->Status()); } } else { s.appendf("%s peers:%d connected:%d active:%d complete:%d\n",tab, torrent->GetPeersCount(),torrent->GetConnectedPeersCount(), torrent->GetActivePeersCount(),torrent->GetCompletePeersCount()); } return s; } void TorrentJob::ShowRunStatus(const SMTaskRef& s) { const xstring& status=torrent->Status(); const char *name=torrent->GetName(); int w=s->GetWidthDelayed()-status.length()-3; if(w<8) w=8; if(w>40) w=40; s->Show("%s: %s",squeeze_file_name(name,w),status.get()); } int TorrentJob::AcceptSig(int sig) { if(!torrent || torrent->ShuttingDown()) return WANTDIE; torrent->Shutdown(); return MOVED; } #include "CmdExec.h" CDECL_BEGIN #include CDECL_END CMD(torrent) { Torrent::ClassInit(); #define args (parent->args) #define eprintf parent->eprintf enum { OPT_OUTPUT_DIRECTORY, OPT_FORCE_VALID, OPT_DHT_BOOTSTRAP, OPT_SHARE, OPT_ONLY_NEW, OPT_ONLY_INCOMPLETE, }; static const struct option torrent_opts[]= { {"output-directory",required_argument,0,OPT_OUTPUT_DIRECTORY}, {"force-valid",no_argument,0,OPT_FORCE_VALID}, {"dht-bootstrap",required_argument,0,OPT_DHT_BOOTSTRAP}, {"share",no_argument,0,OPT_SHARE}, {"only-new",no_argument,0,OPT_ONLY_NEW}, {"only-incomplete",no_argument,0,OPT_ONLY_INCOMPLETE}, {0} }; const char *output_dir=0; const char *dht_bootstrap=0; bool force_valid=false; bool share=false; bool only_new=false; bool only_incomplete=false; args->rewind(); int opt; while((opt=args->getopt_long("O:",torrent_opts,0))!=EOF) { switch(opt) { case(OPT_OUTPUT_DIRECTORY): case('O'): output_dir=optarg; break; case(OPT_FORCE_VALID): force_valid=true; break; case(OPT_DHT_BOOTSTRAP): dht_bootstrap=optarg; Torrent::BootstrapDHT(dht_bootstrap); break; case(OPT_SHARE): share=true; break; case(OPT_ONLY_NEW): only_new=true; // fallthrough case(OPT_ONLY_INCOMPLETE): only_incomplete=true; break; case('?'): try_help: eprintf(_("Try `help %s' for more information.\n"),args->a0()); return 0; } } args->back(); if(share && output_dir) { eprintf(_("%s: --share conflicts with --output-directory.\n"),args->a0()); return 0; } if(share && only_new) { eprintf(_("%s: --share conflicts with --only-new.\n"),args->a0()); return 0; } if(share && only_incomplete) { eprintf(_("%s: --share conflicts with --only-incomplete.\n"),args->a0()); return 0; } xstring_ca torrent_opt(args->Combine(0,args->getindex()+1)); xstring_ca cwd(xgetcwd()); if(output_dir) { output_dir=dir_file(cwd,expand_home_relative(output_dir)); output_dir=alloca_strdup(output_dir); } else output_dir=cwd; Ref args_g(new ArgV(args->a0())); const char *torrent; while((torrent=args->getnext())!=0) { int globbed=0; if(share || !url::is_url(torrent)) { glob_t pglob; glob(expand_home_relative(torrent),0,0,&pglob); if(pglob.gl_pathc>0) { for(unsigned i=0; iAdd(dir_file(cwd,f)); globbed++; } } } globfree(&pglob); } if(!globbed) args_g->Add(torrent); } torrent=args_g->getnext(); if(!torrent) { if(dht_bootstrap) return 0; if(share) eprintf(_("%s: Please specify a file or directory to share.\n"),args->a0()); else eprintf(_("%s: Please specify meta-info file or URL.\n"),args->a0()); goto try_help; } while(torrent) { Torrent *t=new Torrent(torrent,cwd,output_dir); if(force_valid) t->ForceValid(); if(share) t->Share(); if(only_new) t->StopIfKnown(); if(only_incomplete) t->StopIfComplete(); TorrentJob *tj=new TorrentJob(t); tj->cmdline.set(xstring::cat(torrent_opt," ",torrent,NULL)); parent->AddNewJob(tj); torrent=args_g->getnext(); } return 0; #undef args } #include "modconfig.h" #ifndef MODULE_CMD_TORRENT # define module_init cmd_torrent_module_init #endif CDECL void module_init() { Torrent::ClassInit(); CmdExec::RegisterCommand("torrent",cmd_torrent,0, N_("Start BitTorrent job for the given torrent-files, which can be a local file,\n" "URL, magnet link or plain info_hash written in hex or base32. Local wildcards\n" "are expanded. Options:\n" " -O specifies base directory where files should be placed\n" " --force-valid skip file validation\n" " --dht-bootstrap= bootstrap DHT by sending a query to the node\n" " --share share specified file or directory\n")); }