Blame src/DHT.cc

Packit Service a2489d
/*
Packit Service a2489d
 * lftp - file transfer program
Packit Service a2489d
 *
Packit Service a2489d
 * Copyright (c) 2012-2016 by Alexander V. Lukyanov (lav@yars.free.net)
Packit Service a2489d
 *
Packit Service a2489d
 * This program is free software; you can redistribute it and/or modify
Packit Service a2489d
 * it under the terms of the GNU General Public License as published by
Packit Service a2489d
 * the Free Software Foundation; either version 3 of the License, or
Packit Service a2489d
 * (at your option) any later version.
Packit Service a2489d
 *
Packit Service a2489d
 * This program is distributed in the hope that it will be useful,
Packit Service a2489d
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
Packit Service a2489d
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
Packit Service a2489d
 * GNU General Public License for more details.
Packit Service a2489d
 *
Packit Service a2489d
 * You should have received a copy of the GNU General Public License
Packit Service a2489d
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
Packit Service a2489d
 */
Packit Service a2489d
Packit Service a2489d
#include <config.h>
Packit Service a2489d
#include <stdlib.h>
Packit Service a2489d
#include <assert.h>
Packit Service a2489d
#include <sys/types.h>
Packit Service a2489d
#include <sys/stat.h>
Packit Service a2489d
#include <sys/socket.h>
Packit Service a2489d
#include <netinet/in.h>
Packit Service a2489d
#include <arpa/inet.h>
Packit Service a2489d
#include <unistd.h>
Packit Service a2489d
#include <fcntl.h>
Packit Service a2489d
#include <errno.h>
Packit Service a2489d
#include <sha1.h>
Packit Service a2489d
Packit Service a2489d
#include "Torrent.h"
Packit Service a2489d
#include "DHT.h"
Packit Service a2489d
#include "log.h"
Packit Service a2489d
#include "url.h"
Packit Service a2489d
#include "misc.h"
Packit Service a2489d
#include "plural.h"
Packit Service a2489d
Packit Service a2489d
DHT::DHT(int af,const xstring& id)
Packit Service a2489d
   : af(af), rate_limit("DHT"),
Packit Service a2489d
     sent_req_expire_scan(5), search_cleanup_timer(5),
Packit Service a2489d
     refresh_timer(1), nodes_cleanup_timer(30), save_timer(300),
Packit Service a2489d
     node_id(id.copy()), t(random())
Packit Service a2489d
{
Packit Service a2489d
   LogNote(10,"creating DHT with id=%s",node_id.hexdump());
Packit Service a2489d
   Reconfig(0);
Packit Service a2489d
}
Packit Service a2489d
DHT::~DHT()
Packit Service a2489d
{
Packit Service a2489d
}
Packit Service a2489d
void DHT::Bootstrap()
Packit Service a2489d
{
Packit Service a2489d
   // run bootstrap search
Packit Service a2489d
   LogNote(9,"bootstrapping");
Packit Service a2489d
   Search *s=new Search(node_id);
Packit Service a2489d
   s->Bootstrap();
Packit Service a2489d
   StartSearch(s);
Packit Service a2489d
}
Packit Service a2489d
int DHT::Do()
Packit Service a2489d
{
Packit Service a2489d
   int m=STALL;
Packit Service a2489d
   if(state_io) {
Packit Service a2489d
      if(state_io->GetDirection()==IOBuffer::GET) {
Packit Service a2489d
	 if(state_io->Error()) {
Packit Service a2489d
	    LogError(1,"loading state: %s",state_io->ErrorText());
Packit Service a2489d
	    state_io=0;
Packit Service a2489d
	    m=MOVED;
Packit Service a2489d
	 } else if(state_io->Eof()) {
Packit Service a2489d
	    Load(state_io);
Packit Service a2489d
	    state_io=0;
Packit Service a2489d
	    m=MOVED;
Packit Service a2489d
	 }
Packit Service a2489d
      } else {
Packit Service a2489d
	 if(state_io->Error())
Packit Service a2489d
	    LogError(1,"saving state: %s",state_io->ErrorText());
Packit Service a2489d
	 if(state_io->Done()) {
Packit Service a2489d
	    state_io=0;
Packit Service a2489d
	    m=MOVED;
Packit Service a2489d
	 }
Packit Service a2489d
      }
Packit Service a2489d
   }
Packit Service a2489d
   if(sent_req_expire_scan.Stopped()) {
Packit Service a2489d
      for(const Request *r=sent_req.each_begin(); r; r=sent_req.each_next()) {
Packit Service a2489d
	 if(!r->Expired())
Packit Service a2489d
	    continue;
Packit Service a2489d
	 Ref<Request> rr(sent_req.borrow(sent_req.each_key()));
Packit Service a2489d
	 LogError(4,"DHT request %s to %s timed out",r->data->lookup_str("q").get(),r->addr.to_string());
Packit Service a2489d
	 Node *n=nodes.lookup(r->GetNodeId());
Packit Service a2489d
	 if(n) {
Packit Service a2489d
	    n->LostPing();
Packit Service a2489d
	    LogNote(4,"DHT node %s has lost %d packets",n->GetName(),n->ping_lost_count);
Packit Service a2489d
	 }
Packit Service a2489d
	 const xstring& target=r->GetSearchTarget();
Packit Service a2489d
	 if(target) {
Packit Service a2489d
	    Search *s=search.lookup(target);
Packit Service a2489d
	    // if we have lost a search request and have not received any
Packit Service a2489d
	    // reply yet - try to restart the search with good nodes.
Packit Service a2489d
	    if(s && !s->best_node_id)
Packit Service a2489d
	       RestartSearch(s);
Packit Service a2489d
	 }
Packit Service a2489d
      }
Packit Service a2489d
      sent_req_expire_scan.Reset();
Packit Service a2489d
   }
Packit Service a2489d
   if(search_cleanup_timer.Stopped()) {
Packit Service a2489d
      for(Search *s=search.each_begin(); s; s=search.each_next()) {
Packit Service a2489d
	 if(s->search_timer.Stopped())
Packit Service a2489d
	    search.remove(search.each_key());
Packit Service a2489d
      }
Packit Service a2489d
      search_cleanup_timer.Reset();
Packit Service a2489d
   }
Packit Service a2489d
   if(nodes_cleanup_timer.Stopped()) {
Packit Service a2489d
      for(Node *n=nodes.each_begin(); n; n=nodes.each_next()) {
Packit Service a2489d
	 if(n->IsBad()) {
Packit Service a2489d
	    LogNote(9,"removing bad node %s",n->GetName());
Packit Service a2489d
	    RemoveNode(n);
Packit Service a2489d
	 }
Packit Service a2489d
      }
Packit Service a2489d
      if(nodes.count()>MAX_NODES) {
Packit Service a2489d
	 // remove some nodes.
Packit Service a2489d
	 int to_remove=nodes.count()-MAX_NODES;
Packit Service a2489d
	 for(Node *n=nodes.each_begin(); n && to_remove>0; n=nodes.each_next()) {
Packit Service a2489d
	    if(!n->IsGood() && !n->in_routes) {
Packit Service a2489d
	       LogNote(9,"removing node %s (not good)",n->GetName());
Packit Service a2489d
	       RemoveNode(n);
Packit Service a2489d
	       to_remove--;
Packit Service a2489d
	    }
Packit Service a2489d
	 }
Packit Service a2489d
	 for(Node *n=nodes.each_begin(); n && to_remove>0; n=nodes.each_next()) {
Packit Service a2489d
	    if(!n->in_routes && !n->responded) {
Packit Service a2489d
	       LogNote(9,"removing node %s (never responded)",n->GetName());
Packit Service a2489d
	       RemoveNode(n);
Packit Service a2489d
	       to_remove--;
Packit Service a2489d
	    }
Packit Service a2489d
	 }
Packit Service a2489d
	 LogNote(9,"node count=%d",nodes.count());
Packit Service a2489d
      }
Packit Service a2489d
      for(int i=1; i
Packit Service a2489d
	 if(routes[i]->nodes.count()>K) {
Packit Service a2489d
	    xarray<Node*>& nodes=routes[i]->nodes;
Packit Service a2489d
	    int q_num=PingQuestionable(nodes,nodes.count()-K);
Packit Service a2489d
	    if(nodes.count()>K+q_num)
Packit Service a2489d
	       routes[i]->RemoveNode(K); // too many candidates, trim one
Packit Service a2489d
	 }
Packit Service a2489d
      }
Packit Service a2489d
      // remove bad peers
Packit Service a2489d
      for(KnownTorrent *t=torrents.each_begin(); t; t=torrents.each_next()) {
Packit Service a2489d
	 xarray_p<Peer>& p=t->peers;
Packit Service a2489d
	 int seeds=0;
Packit Service a2489d
	 for(int i=0; i
Packit Service a2489d
	    if(!p[i]->IsGood())
Packit Service a2489d
	       p.remove(i--);
Packit Service a2489d
	    else
Packit Service a2489d
	       seeds+=p[i]->seed;
Packit Service a2489d
	 }
Packit Service a2489d
	 LogNote(9,"torrent %s has %d known peers (%d seeds)",
Packit Service a2489d
	    torrents.each_key().hexdump(),p.count(),seeds);
Packit Service a2489d
	 if(p.count()==0)
Packit Service a2489d
	    torrents.remove(torrents.each_key());
Packit Service a2489d
      }
Packit Service a2489d
      nodes_cleanup_timer.Reset();
Packit Service a2489d
      if(save_timer.Stopped()) {
Packit Service a2489d
	 Save();
Packit Service a2489d
	 save_timer.Reset();
Packit Service a2489d
      }
Packit Service a2489d
      if(nodes.count()>0 && routes[0]->nodes.count()<2 && search.count()==0)
Packit Service a2489d
	 Bootstrap(); // lost almost all nodes, try to bootstrap again
Packit Service a2489d
   }
Packit Service a2489d
   if(refresh_timer.Stopped()) {
Packit Service a2489d
      for(int i=0; i
Packit Service a2489d
	 if(!routes[i]->IsFresh()) {
Packit Service a2489d
	    LogNote(9,"refreshing route bucket %d (prefix=%s)",i,routes[i]->to_string());
Packit Service a2489d
	    // make random id in the range
Packit Service a2489d
	    int bytes=routes[i]->prefix_bits/8;
Packit Service a2489d
	    int bits=routes[i]->prefix_bits%8;
Packit Service a2489d
	    xstring random_id(routes[i]->prefix.get(),bytes+(bits>0));
Packit Service a2489d
	    if(bits>0) {
Packit Service a2489d
	       unsigned mask=(1<<(8-bits))-1;
Packit Service a2489d
	       assert(!(random_id[bytes]&mask));
Packit Service a2489d
	       random_id.get_non_const()[bytes]|=(random()/13)&mas;;
Packit Service a2489d
	    }
Packit Service a2489d
	    while(random_id.length()<20)
Packit Service a2489d
	       random_id.append(char(random()/13));
Packit Service a2489d
	    StartSearch(new Search(random_id));
Packit Service a2489d
	    routes[i]->fresh_timer.Reset();
Packit Service a2489d
	 }
Packit Service a2489d
      }
Packit Service a2489d
      refresh_timer.Reset();
Packit Service a2489d
   }
Packit Service a2489d
Packit Service a2489d
   // manual bootstrapping
Packit Service a2489d
   if(resolver) {
Packit Service a2489d
      if(resolver->Error()) {
Packit Service a2489d
	 LogError(1,"%s",resolver->ErrorMsg());
Packit Service a2489d
	 resolver=0;
Packit Service a2489d
	 m=MOVED;
Packit Service a2489d
      } else if(resolver->Done()) {
Packit Service a2489d
	 const xarray<sockaddr_u>& r=resolver->Result();
Packit Service a2489d
	 for(int i=0; i
Packit Service a2489d
	    Torrent::GetDHT(r[i])->SendPing(r[i]);
Packit Service a2489d
	 resolver=0;
Packit Service a2489d
	 m=MOVED;
Packit Service a2489d
      }
Packit Service a2489d
   }
Packit Service a2489d
   if(!state_io && !resolver && bootstrap_nodes.count()>0) {
Packit Service a2489d
      xstring &b=*bootstrap_nodes.next();
Packit Service a2489d
      ParsedURL u(b);
Packit Service a2489d
      if(u.proto==0 && u.host)
Packit Service a2489d
	 resolver=new Resolver(u.host,u.port,"6881");
Packit Service a2489d
      m=MOVED;
Packit Service a2489d
   }
Packit Service a2489d
Packit Service a2489d
   while(send_queue.count()>0 && MaySendMessage()) {
Packit Service a2489d
      SendMessage(send_queue.next().borrow());
Packit Service a2489d
      m=MOVED;
Packit Service a2489d
   }
Packit Service a2489d
Packit Service a2489d
   return m;
Packit Service a2489d
}
Packit Service a2489d
BeNode *DHT::NewQuery(const char *q,xmap_p<BeNode>& a)
Packit Service a2489d
{
Packit Service a2489d
   xmap_p<BeNode> m;
Packit Service a2489d
   BeNode *n=new BeNode((const char*)&t,sizeof(t));
Packit Service a2489d
   m.add("t",n);
Packit Service a2489d
   t++;
Packit Service a2489d
   m.add("y",new BeNode("q",1));
Packit Service a2489d
   m.add("q",new BeNode(q));
Packit Service a2489d
   a.add("id",new BeNode(node_id));
Packit Service a2489d
   m.add("a",new BeNode(&a);;
Packit Service a2489d
   return new BeNode(&m);
Packit Service a2489d
}
Packit Service a2489d
BeNode *DHT::NewReply(const xstring& t0,xmap_p<BeNode>& r)
Packit Service a2489d
{
Packit Service a2489d
   xmap_p<BeNode> m;
Packit Service a2489d
   m.add("t",new BeNode(t0));
Packit Service a2489d
   m.add("y",new BeNode("r",1));
Packit Service a2489d
   r.add("id",new BeNode(node_id));
Packit Service a2489d
   m.add("r",new BeNode(&r);;
Packit Service a2489d
   return new BeNode(&m);
Packit Service a2489d
}
Packit Service a2489d
BeNode *DHT::NewError(const xstring& t0,int code,const char *msg)
Packit Service a2489d
{
Packit Service a2489d
   xmap_p<BeNode> m;
Packit Service a2489d
   m.add("t",new BeNode(t0));
Packit Service a2489d
   m.add("y",new BeNode("e",1));
Packit Service a2489d
   xarray_p<BeNode> e;
Packit Service a2489d
   e.append(new BeNode(code));
Packit Service a2489d
   e.append(new BeNode(msg));
Packit Service a2489d
   m.add("e",new BeNode(&e);;
Packit Service a2489d
   return new BeNode(&m);
Packit Service a2489d
}
Packit Service a2489d
const char *DHT::MessageType(BeNode *q)
Packit Service a2489d
{
Packit Service a2489d
   const xstring& y=q->lookup_str("y");
Packit Service a2489d
   const char *msg_type="message";
Packit Service a2489d
   if(y.eq("q"))
Packit Service a2489d
      msg_type=q->lookup_str("q");
Packit Service a2489d
   else if(y.eq("r"))
Packit Service a2489d
      msg_type="response";
Packit Service a2489d
   else if(y.eq("e"))
Packit Service a2489d
      msg_type="error";
Packit Service a2489d
   return msg_type;
Packit Service a2489d
}
Packit Service a2489d
void DHT::SendMessage(BeNode *q,const sockaddr_u& a,const xstring& id)
Packit Service a2489d
{
Packit Service a2489d
   if(send_queue.count()>MAX_SEND_QUEUE) {
Packit Service a2489d
      LogError(9,"tail dropping output message");
Packit Service a2489d
      delete q;
Packit Service a2489d
      return;
Packit Service a2489d
   }
Packit Service a2489d
   send_queue.push(new Request(q,a,id));
Packit Service a2489d
}
Packit Service a2489d
void DHT::SendMessage(Request *req)
Packit Service a2489d
{
Packit Service a2489d
   req->expire_timer.Reset();
Packit Service a2489d
   BeNode *q=req->data.get_non_const();
Packit Service a2489d
   const sockaddr_u& a=req->addr;
Packit Service a2489d
   LogSend(4,xstring::format("sending DHT %s to %s %s",MessageType(q),
Packit Service a2489d
      a.to_string(),q->Format1()));
Packit Service a2489d
   int res=-1;
Packit Service a2489d
   res=Torrent::GetUDPSocket(af)->SendUDP(a,q->Pack());
Packit Service a2489d
   if(res!=-1 && q->lookup_str("y").eq("q")) {
Packit Service a2489d
      sent_req.add(q->lookup_str("t"),req);
Packit Service a2489d
      rate_limit.BytesPut(res);
Packit Service a2489d
   } else {
Packit Service a2489d
      delete req;
Packit Service a2489d
   }
Packit Service a2489d
}
Packit Service a2489d
bool DHT::MaySendMessage()
Packit Service a2489d
{
Packit Service a2489d
   return rate_limit.BytesAllowedToPut()>=256
Packit Service a2489d
      && Torrent::GetUDPSocket(af)->MaySendUDP();
Packit Service a2489d
}
Packit Service a2489d
void DHT::SendPing(const sockaddr_u& a,const xstring& id)
Packit Service a2489d
{
Packit Service a2489d
   if(a.port()==0 || a.is_private() || a.is_reserved() || a.is_multicast())
Packit Service a2489d
      return;
Packit Service a2489d
   Enter();
Packit Service a2489d
   xmap_p<BeNode> arg;
Packit Service a2489d
   SendMessage(NewQuery("ping",arg),a,id);
Packit Service a2489d
   Leave();
Packit Service a2489d
}
Packit Service a2489d
void DHT::SendPing(Node *n)
Packit Service a2489d
{
Packit Service a2489d
   SendPing(n->addr,n->id);
Packit Service a2489d
   n->ping_timer.Reset();
Packit Service a2489d
}
Packit Service a2489d
void DHT::AnnouncePeer(const Torrent *t)
Packit Service a2489d
{
Packit Service a2489d
   const xstring& info_hash=t->GetInfoHash();
Packit Service a2489d
   // check for duplicated announce
Packit Service a2489d
   if(search.exists(info_hash))
Packit Service a2489d
      return;
Packit Service a2489d
   Enter();
Packit Service a2489d
   Search *s=new Search(info_hash);
Packit Service a2489d
   s->WantPeers(t->Complete());
Packit Service a2489d
#if INET6
Packit Service a2489d
   // try to find nodes in the other AF if needed
Packit Service a2489d
   if(Torrent::GetDHT(af==AF_INET?AF_INET6:AF_INET)->nodes.count()<1)
Packit Service a2489d
      s->Bootstrap();
Packit Service a2489d
#endif
Packit Service a2489d
   StartSearch(s);
Packit Service a2489d
   Leave();
Packit Service a2489d
}
Packit Service a2489d
void DHT::DenouncePeer(const Torrent *t)
Packit Service a2489d
{
Packit Service a2489d
   // remove any search for this torrent
Packit Service a2489d
   search.remove(t->GetInfoHash());
Packit Service a2489d
}
Packit Service a2489d
int DHT::AddNodesToReply(xmap_p<BeNode> &r,const xstring& target,int max_count)
Packit Service a2489d
{
Packit Service a2489d
   xarray<Node*> n;
Packit Service a2489d
   FindNodes(target,n,max_count,true);
Packit Service a2489d
   xstring compact_nodes;
Packit Service a2489d
   for(int i=0; i
Packit Service a2489d
      compact_nodes.append(n[i]->id);
Packit Service a2489d
      compact_nodes.append(n[i]->addr.compact());
Packit Service a2489d
   }
Packit Service a2489d
   r.add(af==AF_INET?"nodes":"nodes6",new BeNode(compact_nodes));
Packit Service a2489d
   return n.count();
Packit Service a2489d
}
Packit Service a2489d
int DHT::AddNodesToReply(xmap_p<BeNode> &r,const xstring& target,bool want_n4,bool want_n6)
Packit Service a2489d
{
Packit Service a2489d
   int nodes_count=0;
Packit Service a2489d
   if(want_n4)
Packit Service a2489d
      nodes_count+=Torrent::GetDHT(AF_INET)->AddNodesToReply(r,target,K);
Packit Service a2489d
   if(want_n6)
Packit Service a2489d
      nodes_count+=Torrent::GetDHT(AF_INET6)->AddNodesToReply(r,target,K);
Packit Service a2489d
   return nodes_count;
Packit Service a2489d
}
Packit Service a2489d
const xstring& DHT::Node::GetToken()
Packit Service a2489d
{
Packit Service a2489d
   if(!my_token || token_timer.Stopped()) {
Packit Service a2489d
      // make new token
Packit Service a2489d
      my_last_token.set(my_token);
Packit Service a2489d
      my_token.truncate();
Packit Service a2489d
      for(int i=0; i<16; i++)
Packit Service a2489d
	 my_token.append(char(random()/13));
Packit Service a2489d
      token_timer.Reset();
Packit Service a2489d
   }
Packit Service a2489d
   return my_token;
Packit Service a2489d
}
Packit Service a2489d
bool DHT::Node::TokenIsValid(const xstring& token) const
Packit Service a2489d
{
Packit Service a2489d
   if(!token || !my_token || token_timer.Stopped())
Packit Service a2489d
      return false;
Packit Service a2489d
   return token.eq(my_token) || token.eq(my_last_token);
Packit Service a2489d
}
Packit Service a2489d
const xstring& DHT::Request::GetSearchTarget() const
Packit Service a2489d
{
Packit Service a2489d
   const BeNode *a=data->lookup("a",BeNode::BE_DICT);
Packit Service a2489d
   if(!a)
Packit Service a2489d
      return xstring::null;
Packit Service a2489d
   const xstring& q=data->lookup_str("q");
Packit Service a2489d
   const char *target=q.eq("find_node")?"target":"info_hash";
Packit Service a2489d
   return a->lookup_str(target);
Packit Service a2489d
}
Packit Service a2489d
void DHT::HandlePacket(BeNode *p,const sockaddr_u& src)
Packit Service a2489d
{
Packit Service a2489d
   LogRecv(4,xstring::format("received DHT %s from %s %s",MessageType(p),
Packit Service a2489d
      src.to_string(),p->Format1()));
Packit Service a2489d
   int pkt_len=p->str.length();
Packit Service a2489d
   const xstring& t=p->lookup_str("t");
Packit Service a2489d
   if(!t)
Packit Service a2489d
      return;
Packit Service a2489d
   const xstring& y=p->lookup_str("y");
Packit Service a2489d
   if(!y)
Packit Service a2489d
      return;
Packit Service a2489d
   if(y.eq("q")) { // query
Packit Service a2489d
      if(rate_limit.BytesAllowedToGet()
Packit Service a2489d
	 LogError(9,"dropping incoming message (rate limit exceeded)");
Packit Service a2489d
	 return;
Packit Service a2489d
      }
Packit Service a2489d
      rate_limit.BytesGot(pkt_len);
Packit Service a2489d
      const xstring& q=p->lookup_str("q");
Packit Service a2489d
      if(!q)
Packit Service a2489d
	 return;
Packit Service a2489d
      BeNode *a=p->lookup("a",BeNode::BE_DICT);
Packit Service a2489d
      if(!a)
Packit Service a2489d
	 return;
Packit Service a2489d
      const xstring& id=a->lookup_str("id");
Packit Service a2489d
      if(id.length()!=20)
Packit Service a2489d
	 return;
Packit Service a2489d
      Node *node=FoundNode(id,src,false);
Packit Service a2489d
      if(!node)
Packit Service a2489d
	 return;
Packit Service a2489d
Packit Service a2489d
      xmap_p<BeNode> r;
Packit Service a2489d
      if(src.family()==AF_INET)
Packit Service a2489d
	 r.add("ip",new BeNode(src.compact_addr()));
Packit Service a2489d
Packit Service a2489d
      bool want_n4=false;
Packit Service a2489d
      bool want_n6=false;
Packit Service a2489d
      BeNode *want=a->lookup("want",BeNode::BE_LIST);
Packit Service a2489d
      if(want) {
Packit Service a2489d
	 for(int i=0; i<want->list.count(); i++) {
Packit Service a2489d
	    BeNode *w=want->list[i];
Packit Service a2489d
	    if(w->type!=BeNode::BE_STR)
Packit Service a2489d
	       continue;
Packit Service a2489d
	    if(w->str.eq("n4"))
Packit Service a2489d
	       want_n4=true;
Packit Service a2489d
	    if(w->str.eq("n6"))
Packit Service a2489d
	       want_n6=true;
Packit Service a2489d
	 }
Packit Service a2489d
      }
Packit Service a2489d
      if(!want_n4 && !want_n6) {
Packit Service a2489d
	 want_n4=(src.family()==AF_INET);
Packit Service a2489d
	 want_n6=(src.family()==AF_INET6);
Packit Service a2489d
      }
Packit Service a2489d
Packit Service a2489d
      if(q.eq("ping")) {
Packit Service a2489d
	 LogSend(5,xstring::format("DHT ping reply to %s",src.to_string()));
Packit Service a2489d
	 SendMessage(NewReply(t,r),src);
Packit Service a2489d
      } else if(q.eq("find_node")) {
Packit Service a2489d
	 const xstring& target=a->lookup_str("target");
Packit Service a2489d
	 if(!target)
Packit Service a2489d
	    return;
Packit Service a2489d
	 int nodes_count=AddNodesToReply(r,target,want_n4,want_n6);
Packit Service a2489d
	 LogSend(5,xstring::format("DHT find_node reply with %d nodes to %s",nodes_count,src.to_string()));
Packit Service a2489d
	 SendMessage(NewReply(t,r),src);
Packit Service a2489d
      } else if(q.eq("get_peers")) {
Packit Service a2489d
	 const xstring& info_hash=a->lookup_str("info_hash");
Packit Service a2489d
	 if(info_hash.length()!=20)
Packit Service a2489d
	    return;
Packit Service a2489d
	 bool noseed=a->lookup_int("noseed");
Packit Service a2489d
	 KnownTorrent *torrent=torrents.lookup(info_hash);
Packit Service a2489d
	 int nodes_count=0;
Packit Service a2489d
	 int values_count=0;
Packit Service a2489d
	 if(!torrent || torrent->peers.count()==0) {
Packit Service a2489d
	    nodes_count=AddNodesToReply(r,info_hash,want_n4,want_n6);
Packit Service a2489d
	 } else {
Packit Service a2489d
	    xarray_p<Peer>& p=torrent->peers;
Packit Service a2489d
	    xarray_p<BeNode> values;
Packit Service a2489d
	    for(int i=0; i
Packit Service a2489d
	       Peer *peer=p[i];
Packit Service a2489d
	       if(noseed && peer->seed)
Packit Service a2489d
		  continue;
Packit Service a2489d
	       if(!peer->IsGood())
Packit Service a2489d
		  continue;
Packit Service a2489d
	       if(peer->compact_addr.family()==AF_INET && !want_n4)
Packit Service a2489d
		  continue;
Packit Service a2489d
#if INET6
Packit Service a2489d
	       if(peer->compact_addr.family()==AF_INET6 && !want_n6)
Packit Service a2489d
		  continue;
Packit Service a2489d
#endif
Packit Service a2489d
	       values.append(new BeNode(peer->compact_addr));
Packit Service a2489d
	       values_count++;
Packit Service a2489d
	    }
Packit Service a2489d
	    if(values_count>0)
Packit Service a2489d
	       r.add("values",new BeNode(&values));
Packit Service a2489d
	    else
Packit Service a2489d
	       nodes_count=AddNodesToReply(r,info_hash,want_n4,want_n6);
Packit Service a2489d
	 }
Packit Service a2489d
	 r.add("token",new BeNode(node->GetToken()));
Packit Service a2489d
	 LogSend(5,xstring::format("DHT get_peers reply with %d values and %d nodes to %s",
Packit Service a2489d
	    values_count,nodes_count,src.to_string()));
Packit Service a2489d
	 SendMessage(NewReply(t,r),src);
Packit Service a2489d
      } else if(q.eq("announce_peer")) {
Packit Service a2489d
	 // need a valid token
Packit Service a2489d
	 if(!node->TokenIsValid(a->lookup_str("token"))) {
Packit Service a2489d
	    SendMessage(NewError(t,ERR_PROTOCOL,"invalid token"),src);
Packit Service a2489d
	    return;
Packit Service a2489d
	 }
Packit Service a2489d
	 // ok, token is valid. Now add the peer.
Packit Service a2489d
	 const xstring& info_hash=a->lookup_str("info_hash");
Packit Service a2489d
	 if(info_hash.length()!=20)
Packit Service a2489d
	    return;
Packit Service a2489d
	 int port=a->lookup_int("port");
Packit Service a2489d
	 if(!port)
Packit Service a2489d
	    return;
Packit Service a2489d
	 bool seed=a->lookup_int("seed");
Packit Service a2489d
	 sockaddr_u peer_addr(src);
Packit Service a2489d
	 peer_addr.set_port(port);
Packit Service a2489d
	 AddPeer(info_hash,peer_addr.compact(),seed);
Packit Service a2489d
	 SendMessage(NewReply(t,r),src);
Packit Service a2489d
      } else if(q.eq("vote")) {
Packit Service a2489d
#if 0
Packit Service a2489d
	 // need a valid token
Packit Service a2489d
	 if(!node->TokenIsValid(a->lookup_str("token"))) {
Packit Service a2489d
	    SendMessage(NewError(t,ERR_PROTOCOL,"invalid token"),src);
Packit Service a2489d
	    return;
Packit Service a2489d
	 }
Packit Service a2489d
	 // target is sha1(info_hash+"rating")
Packit Service a2489d
	 const xstring& target=a->lookup_str("target");
Packit Service a2489d
	 if(target.length()!=20)
Packit Service a2489d
            return;
Packit Service a2489d
         unsigned vote=a->lookup_int("vote");
Packit Service a2489d
	 // store the vote
Packit Service a2489d
	 // return what?
Packit Service a2489d
	 SendMessage(NewReply(t,r),src);
Packit Service a2489d
#endif
Packit Service a2489d
      } else {
Packit Service a2489d
	 SendMessage(NewError(t,ERR_UNKNOWN_METHOD,"method unknown"),src);
Packit Service a2489d
      }
Packit Service a2489d
      return;
Packit Service a2489d
   }
Packit Service a2489d
Packit Service a2489d
   Ref<Request> req(sent_req.borrow(t));
Packit Service a2489d
   if(!req) {
Packit Service a2489d
      LogError(2,"got DHT reply with unknown `t' from %s",src.to_string());
Packit Service a2489d
      return;
Packit Service a2489d
   }
Packit Service a2489d
   if(req->addr!=src) {
Packit Service a2489d
      LogError(2,"got DHT reply from %s instead of %s",src.to_string(),req->addr.to_string());
Packit Service a2489d
      return;
Packit Service a2489d
   }
Packit Service a2489d
Packit Service a2489d
   const xstring& q=req->data->lookup_str("q");
Packit Service a2489d
   if(y.eq("r")) { // reply
Packit Service a2489d
      BeNode *r=p->lookup("r",BeNode::BE_DICT);
Packit Service a2489d
      if(!r)
Packit Service a2489d
	 return;
Packit Service a2489d
      const xstring& id=r->lookup_str("id");
Packit Service a2489d
      if(id.length()!=20)
Packit Service a2489d
	 return;
Packit Service a2489d
Packit Service a2489d
      Node *node=FoundNode(id,src,true);
Packit Service a2489d
      if(!node)
Packit Service a2489d
	 return;
Packit Service a2489d
Packit Service a2489d
      const sockaddr_compact& ip=sockaddr_compact::cast(r->lookup_str("ip"));
Packit Service a2489d
      if(ip && !ValidNodeId(node_id,ip)) {
Packit Service a2489d
	 const xstring &src_ip=xstring::get_tmp(src.address());
Packit Service a2489d
	 if(src_ip.eq(ip.address())) {
Packit Service a2489d
	    LogError(2,"%s incorrectly reported our IP as %s",src.to_string(),ip.address());
Packit Service a2489d
	    black_list.Add(src,"1d");
Packit Service a2489d
	    return;
Packit Service a2489d
	 } else if(!ip_voted.lookup(src_ip)) {
Packit Service a2489d
	    ip_voted.add(src_ip,true);
Packit Service a2489d
	    unsigned& votes=ip_votes.lookup_Lv(ip);
Packit Service a2489d
	    votes++;
Packit Service a2489d
	    LogNote(2,"%s reported our IP as %s (votes=%u)",src.to_string(),ip.address(),votes);
Packit Service a2489d
	    if(votes>=4) {
Packit Service a2489d
	       // we have incorrect node_id, restart with correct one.
Packit Service a2489d
	       MakeNodeId(node_id,ip);
Packit Service a2489d
	       LogNote(0,"restarting DHT with new id %s",node_id.hexdump());
Packit Service a2489d
	       Restart();
Packit Service a2489d
	    }
Packit Service a2489d
	 }
Packit Service a2489d
      }
Packit Service a2489d
Packit Service a2489d
      if(q.eq("get_peers")) {
Packit Service a2489d
	 const xstring& info_hash=req->GetSearchTarget();
Packit Service a2489d
	 Torrent *torrent=Torrent::FindTorrent(info_hash);
Packit Service a2489d
	 BeNode *values=r->lookup("values",BeNode::BE_LIST);
Packit Service a2489d
	 if(values) {
Packit Service a2489d
	    // some peers found.
Packit Service a2489d
	    for(int i=0; i<values->list.count(); i++) {
Packit Service a2489d
	       if(values->list[i]->type!=BeNode::BE_STR)
Packit Service a2489d
		  continue;
Packit Service a2489d
	       const sockaddr_compact &c=sockaddr_compact::cast(values->list[i]->str);
Packit Service a2489d
	       sockaddr_u a(c);
Packit Service a2489d
	       if(!a.port())
Packit Service a2489d
		  continue;
Packit Service a2489d
	       LogNote(9,"found peer %s for info_hash=%s",a.to_string(),info_hash.hexdump());
Packit Service a2489d
	       if(torrent)
Packit Service a2489d
		  torrent->AddPeer(new TorrentPeer(torrent,&a,TorrentPeer::TR_DHT));
Packit Service a2489d
	    }
Packit Service a2489d
	 }
Packit Service a2489d
	 const xstring& token=r->lookup_str("token");
Packit Service a2489d
	 if(token && torrent) {
Packit Service a2489d
	    if(!ValidNodeId(id,src.compact_addr()))
Packit Service a2489d
	       LogError(2,"warning: node id %s is invalid for %s",id.hexdump(),src.address());
Packit Service a2489d
	    // announce the torrent
Packit Service a2489d
	    int port=torrent->GetPortIPv4();
Packit Service a2489d
#if INET6
Packit Service a2489d
	    if(src.family()==AF_INET6)
Packit Service a2489d
	       port=torrent->GetPortIPv6();
Packit Service a2489d
#endif
Packit Service a2489d
	    xmap_p<BeNode> a;
Packit Service a2489d
	    a.add("info_hash",new BeNode(info_hash));
Packit Service a2489d
	    a.add("port",new BeNode(port));
Packit Service a2489d
	    a.add("token",new BeNode(token));
Packit Service a2489d
	    if(torrent->Complete())
Packit Service a2489d
	       a.add("seed",new BeNode(1));
Packit Service a2489d
	    SendMessage(NewQuery("announce_peer",a),src,id);
Packit Service a2489d
	    torrent->DHT_Announced(src.family());
Packit Service a2489d
	 }
Packit Service a2489d
      }
Packit Service a2489d
      if(q.eq("find_node") || q.eq("get_peers")) {
Packit Service a2489d
	 const xstring& target=req->GetSearchTarget();
Packit Service a2489d
	 const xstring &node_id=req->GetNodeId();
Packit Service a2489d
	 LogNote(9,"got reply for %s with target %s from node %s",q.get(),target.hexdump(),node_id.hexdump());
Packit Service a2489d
Packit Service a2489d
	 Search *s=search.lookup(target);
Packit Service a2489d
	 if(s && s->IsFeasible(node_id)) {
Packit Service a2489d
	    s->best_node_id.set(node_id);
Packit Service a2489d
	    s->depth++;
Packit Service a2489d
	    LogNote(9,"search for %s goes to depth=%d with best_node_id=%s",
Packit Service a2489d
	       s->target_id.hexdump(),s->depth,node_id.hexdump());
Packit Service a2489d
	 }
Packit Service a2489d
Packit Service a2489d
	 const xstring& nodes=r->lookup_str("nodes");
Packit Service a2489d
	 if(nodes) {
Packit Service a2489d
	    LogNote(9,"adding %d nodes",(int)nodes.length()/26);
Packit Service a2489d
	    const char *data=nodes;
Packit Service a2489d
	    int len=nodes.length();
Packit Service a2489d
	    while(len>=26) {
Packit Service a2489d
	       xstring id(data,20);
Packit Service a2489d
	       sockaddr_u a;
Packit Service a2489d
	       a.set_compact(data+20,6);
Packit Service a2489d
	       data+=26;
Packit Service a2489d
	       len-=26;
Packit Service a2489d
	       Node *new_node=FoundNode(id,a,false,s);
Packit Service a2489d
	       if(new_node)
Packit Service a2489d
		  new_node->SetOrigin(node);
Packit Service a2489d
	    }
Packit Service a2489d
	 }
Packit Service a2489d
#if INET6
Packit Service a2489d
	 const xstring& nodes6=r->lookup_str("nodes6");
Packit Service a2489d
	 if(nodes6) {
Packit Service a2489d
	    LogNote(9,"adding %d nodes6",(int)nodes6.length()/38);
Packit Service a2489d
	    const char *data=nodes6;
Packit Service a2489d
	    int len=nodes6.length();
Packit Service a2489d
	    while(len>=38) {
Packit Service a2489d
	       xstring id(data,20);
Packit Service a2489d
	       sockaddr_u a;
Packit Service a2489d
	       a.set_compact(data+20,18);
Packit Service a2489d
	       data+=38;
Packit Service a2489d
	       len-=38;
Packit Service a2489d
	       Node *new_node=FoundNode(id,a,false,s);
Packit Service a2489d
	       if(new_node)
Packit Service a2489d
		  new_node->SetOrigin(node);
Packit Service a2489d
	    }
Packit Service a2489d
	 }
Packit Service a2489d
#endif //INET6
Packit Service a2489d
      }
Packit Service a2489d
   } else if(y.eq("e")) { // error
Packit Service a2489d
      int code=0;
Packit Service a2489d
      const char *msg="unknown";
Packit Service a2489d
      BeNode *e=p->lookup("e",BeNode::BE_LIST);
Packit Service a2489d
      if(e) {
Packit Service a2489d
	 if(e->list.count()>=1 && e->list[0]->type==BeNode::BE_INT)
Packit Service a2489d
	    code=e->list[0]->num;
Packit Service a2489d
	 if(e->list.count()>=2 && e->list[1]->type==BeNode::BE_STR)
Packit Service a2489d
	    msg=e->list[1]->str;
Packit Service a2489d
      }
Packit Service a2489d
      LogError(2,"got DHT error for %s (%d: %s) from %s",q.get(),code,msg,src.to_string());
Packit Service a2489d
   }
Packit Service a2489d
}
Packit Service a2489d
Packit Service a2489d
bool DHT::Search::IsFeasible(const xstring &id) const
Packit Service a2489d
{
Packit Service a2489d
   if(!best_node_id)
Packit Service a2489d
      return true;
Packit Service a2489d
   for(int i=0; i<20; i++) {
Packit Service a2489d
      unsigned char a=id[i]^target_id[i];
Packit Service a2489d
      unsigned char b=best_node_id[i]^target_id[i];
Packit Service a2489d
      if(a
Packit Service a2489d
	 return true;
Packit Service a2489d
      if(a>b)
Packit Service a2489d
	 return false;
Packit Service a2489d
   }
Packit Service a2489d
   return false;
Packit Service a2489d
}
Packit Service a2489d
Packit Service a2489d
void DHT::Search::ContinueOn(DHT *d,const Node *n)
Packit Service a2489d
{
Packit Service a2489d
   if(searched.exists(n->id)) {
Packit Service a2489d
      LogNote(9,"skipping search on %s, already searched",n->GetName());
Packit Service a2489d
      return;
Packit Service a2489d
   }
Packit Service a2489d
   LogNote(3,"search for %s continues on %s (%s) depth=%d",
Packit Service a2489d
      target_id.hexdump(),n->id.hexdump(),n->GetName(),depth);
Packit Service a2489d
Packit Service a2489d
   xmap_p<BeNode> a;
Packit Service a2489d
#if INET6
Packit Service a2489d
   if(bootstrap) {
Packit Service a2489d
	 xarray_p<BeNode> want;
Packit Service a2489d
	 want.append(new BeNode("n4"));
Packit Service a2489d
	 want.append(new BeNode("n6"));
Packit Service a2489d
	 a.add("want",new BeNode(&want));
Packit Service a2489d
   }
Packit Service a2489d
#endif
Packit Service a2489d
   if(!want_peers) {
Packit Service a2489d
      a.add("target",new BeNode(target_id));
Packit Service a2489d
      d->SendMessage(d->NewQuery("find_node",a),n->addr,n->id);
Packit Service a2489d
   } else {
Packit Service a2489d
      a.add("info_hash",new BeNode(target_id));
Packit Service a2489d
      if(noseed)
Packit Service a2489d
	 a.add("noseed",new BeNode(1));
Packit Service a2489d
      d->SendMessage(d->NewQuery("get_peers",a),n->addr,n->id);
Packit Service a2489d
   }
Packit Service a2489d
   searched.add(n->id,true);
Packit Service a2489d
   search_timer.Reset();
Packit Service a2489d
}
Packit Service a2489d
Packit Service a2489d
void DHT::StartSearch(Search *s)
Packit Service a2489d
{
Packit Service a2489d
   LogNote(9,"starting search for %s",s->target_id.hexdump());
Packit Service a2489d
   xarray<Node*> n;
Packit Service a2489d
   FindNodes(s->target_id,n,K,true);
Packit Service a2489d
   if(n.count()<=K/2) {
Packit Service a2489d
      LogNote(2,"too few good nodes found in the routing table");
Packit Service a2489d
      FindNodes(s->target_id,n,K,false);
Packit Service a2489d
      if(n.count()==0)
Packit Service a2489d
	 LogError(1,"no nodes found in the routing table");
Packit Service a2489d
   }
Packit Service a2489d
   for(int i=0; i
Packit Service a2489d
      s->ContinueOn(this,n[i]);
Packit Service a2489d
   search.add(s->target_id,s);
Packit Service a2489d
}
Packit Service a2489d
Packit Service a2489d
void DHT::RestartSearch(Search *s)
Packit Service a2489d
{
Packit Service a2489d
   xarray<Node*> n;
Packit Service a2489d
   FindNodes(s->target_id,n,K,true,&s->searched);
Packit Service a2489d
   for(int i=0; i
Packit Service a2489d
      s->ContinueOn(this,n[i]);
Packit Service a2489d
}
Packit Service a2489d
Packit Service a2489d
DHT::Node *DHT::GetOrigin(const Node *n) {
Packit Service a2489d
   if(!n->origin_id)
Packit Service a2489d
      return 0;
Packit Service a2489d
   Node *origin=nodes.lookup(n->origin_id);
Packit Service a2489d
   if(!origin || origin==n)
Packit Service a2489d
      return 0;
Packit Service a2489d
   return origin;
Packit Service a2489d
}
Packit Service a2489d
Packit Service a2489d
DHT::Node *DHT::FoundNode(const xstring& id,const sockaddr_u& a,bool responded,Search *s)
Packit Service a2489d
{
Packit Service a2489d
   if(a.port()==0 || a.is_private() || a.is_reserved() || a.is_multicast()) {
Packit Service a2489d
      LogError(9,"node address %s is not valid",a.to_string());
Packit Service a2489d
      return 0;
Packit Service a2489d
   }
Packit Service a2489d
Packit Service a2489d
   if(a.family()!=af)
Packit Service a2489d
      return 0;
Packit Service a2489d
Packit Service a2489d
   if(id.eq(node_id)) {
Packit Service a2489d
      LogNote(9,"node %s has our own id",a.to_string());
Packit Service a2489d
      return 0;
Packit Service a2489d
   }
Packit Service a2489d
Packit Service a2489d
   if(black_list.Listed(a)) {
Packit Service a2489d
      LogNote(9,"node %s is blacklisted",a.to_string());
Packit Service a2489d
      return 0;
Packit Service a2489d
   }
Packit Service a2489d
Packit Service a2489d
   Node *n=nodes.lookup(id);
Packit Service a2489d
   if(!n) {
Packit Service a2489d
      n=node_by_addr.lookup(a.compact());
Packit Service a2489d
      if(n) {
Packit Service a2489d
	 if(!responded) {
Packit Service a2489d
	    // change node id only by a message from the node.
Packit Service a2489d
	    return 0;
Packit Service a2489d
	 }
Packit Service a2489d
	 if(n->id_change_count>0) {
Packit Service a2489d
	    LogError(9,"%s changes node id again",n->addr.to_string());
Packit Service a2489d
	    BlackListNode(n,"1d");
Packit Service a2489d
	    return 0;
Packit Service a2489d
	 }
Packit Service a2489d
	 ChangeNodeId(n,id);
Packit Service a2489d
      } else {
Packit Service a2489d
	 n=new Node(id,a);
Packit Service a2489d
	 AddNode(n);
Packit Service a2489d
      }
Packit Service a2489d
   } else {
Packit Service a2489d
      AddRoute(n);
Packit Service a2489d
   }
Packit Service a2489d
Packit Service a2489d
   if(responded) {
Packit Service a2489d
      n->responded=true;
Packit Service a2489d
      n->ResetLostPing();
Packit Service a2489d
      Node *origin=GetOrigin(n);
Packit Service a2489d
      if(origin)
Packit Service a2489d
	 origin->bad_node_count/=2;
Packit Service a2489d
   }
Packit Service a2489d
   if(n->responded)
Packit Service a2489d
      n->SetGood();
Packit Service a2489d
Packit Service a2489d
   // continue search
Packit Service a2489d
   if(s && s->IsFeasible(n))
Packit Service a2489d
      s->ContinueOn(this,n);
Packit Service a2489d
Packit Service a2489d
   return n;
Packit Service a2489d
}
Packit Service a2489d
void DHT::RemoveNode(Node *n)
Packit Service a2489d
{
Packit Service a2489d
   Node *origin=GetOrigin(n);
Packit Service a2489d
   if(origin && !n->responded && n->ping_lost_count>1) {
Packit Service a2489d
      origin->bad_node_count++;
Packit Service a2489d
      if(origin->bad_node_count>2*K)
Packit Service a2489d
	 BlackListNode(origin,"1h");
Packit Service a2489d
   }
Packit Service a2489d
   RemoveRoute(n);
Packit Service a2489d
   node_by_addr.remove(n->addr.compact());
Packit Service a2489d
   nodes.remove(n->id);
Packit Service a2489d
}
Packit Service a2489d
void DHT::ChangeNodeId(Node *n,const xstring& new_node_id)
Packit Service a2489d
{
Packit Service a2489d
   LogNote(1,"node_id changed for %s, old_node_id=%s, new_node_id=%s",
Packit Service a2489d
      n->GetName(),n->id.hexdump(),new_node_id.hexdump());
Packit Service a2489d
   n->id_change_count++;
Packit Service a2489d
Packit Service a2489d
   // change node_id in the in-flight requests
Packit Service a2489d
   for(Request *r=sent_req.each_begin(); r; r=sent_req.each_next()) {
Packit Service a2489d
      if(r->node_id.eq(n->id) && r->addr==n->addr)
Packit Service a2489d
	 r->node_id.set(new_node_id);
Packit Service a2489d
   }
Packit Service a2489d
Packit Service a2489d
   RemoveRoute(n);
Packit Service a2489d
   nodes.borrow(n->id);	// borrow to avoid freeing the node
Packit Service a2489d
   n->id.set(new_node_id);
Packit Service a2489d
   nodes.add(n->id,n);
Packit Service a2489d
   AddRoute(n);
Packit Service a2489d
}
Packit Service a2489d
void DHT::BlackListNode(Node *n,const char *timeout)
Packit Service a2489d
{
Packit Service a2489d
   black_list.Add(n->addr,timeout);
Packit Service a2489d
   for(int i=0; i
Packit Service a2489d
      if(send_queue[i]->node_id.eq(n->id))
Packit Service a2489d
	 send_queue.remove(i);
Packit Service a2489d
   }
Packit Service a2489d
   for(const Request *r=sent_req.each_begin(); r; r=sent_req.each_next()) {
Packit Service a2489d
      if(r->GetNodeId().eq(n->id))
Packit Service a2489d
	 sent_req.remove(sent_req.each_key());
Packit Service a2489d
   }
Packit Service a2489d
   RemoveNode(n);
Packit Service a2489d
}
Packit Service a2489d
Packit Service a2489d
void DHT::AddNode(Node *n)
Packit Service a2489d
{
Packit Service a2489d
   assert(n->id.length()==20);
Packit Service a2489d
   assert(!nodes.exists(n->id));
Packit Service a2489d
   assert(!node_by_addr.exists(n->addr.compact()));
Packit Service a2489d
Packit Service a2489d
   nodes.add(n->id,n);
Packit Service a2489d
   node_by_addr.add(n->addr.compact(),n);
Packit Service a2489d
Packit Service a2489d
   AddRoute(n);
Packit Service a2489d
Packit Service a2489d
   if(nodes.count()==1 && search.count()==0 && !state_io)
Packit Service a2489d
      Bootstrap();
Packit Service a2489d
}
Packit Service a2489d
int DHT::FindRoute(const xstring& id,int i,int skew)
Packit Service a2489d
{
Packit Service a2489d
   // routes are ordered by prefix length decreasing
Packit Service a2489d
   // the first route bucket always matches our node_id
Packit Service a2489d
   for( ; i
Packit Service a2489d
      if(routes[i]->PrefixMatch(id,skew)) {
Packit Service a2489d
	 return i;
Packit Service a2489d
      }
Packit Service a2489d
   }
Packit Service a2489d
   return -1;
Packit Service a2489d
}
Packit Service a2489d
void DHT::RemoveRoute(Node *n)
Packit Service a2489d
{
Packit Service a2489d
   int i=FindRoute(n->id);
Packit Service a2489d
   if(i==-1)
Packit Service a2489d
      return;
Packit Service a2489d
   routes[i]->RemoveNode(n);
Packit Service a2489d
}
Packit Service a2489d
void DHT::AddRoute(Node *n)
Packit Service a2489d
{
Packit Service a2489d
try_again:
Packit Service a2489d
   int i=FindRoute(n->id);
Packit Service a2489d
   if(i==-1) {
Packit Service a2489d
      assert(routes.count()==0);
Packit Service a2489d
      routes.append(new RouteBucket(0,xstring::null));
Packit Service a2489d
      i=0;
Packit Service a2489d
   }
Packit Service a2489d
   const Ref<RouteBucket>& r=routes[i];
Packit Service a2489d
   xarray<Node*> &nodes=r->nodes;
Packit Service a2489d
   // check if the node is already in the bucket
Packit Service a2489d
   for(int j=0; j
Packit Service a2489d
      if(nodes[j]==n) {
Packit Service a2489d
	 // K nodes are stable, other nodes are candidates
Packit Service a2489d
	 if(j
Packit Service a2489d
	    // move the stable node to the end of K set
Packit Service a2489d
	    // and set the bucket as fresh.
Packit Service a2489d
	    r->SetFresh();
Packit Service a2489d
	    nodes.remove(j);
Packit Service a2489d
	    if(nodes.count()
Packit Service a2489d
	       nodes.append(n);
Packit Service a2489d
	    else
Packit Service a2489d
	       nodes.insert(n,K-1);
Packit Service a2489d
	    // remove a candidate
Packit Service a2489d
	 }
Packit Service a2489d
	 return;
Packit Service a2489d
      }
Packit Service a2489d
   }
Packit Service a2489d
   // remove a bad node to free space
Packit Service a2489d
   if(nodes.count()>=K) {
Packit Service a2489d
      for(int j=0; j
Packit Service a2489d
	 if(nodes[j]->IsBad()) {
Packit Service a2489d
	    r->RemoveNode(j);
Packit Service a2489d
	    break;
Packit Service a2489d
	 }
Packit Service a2489d
      }
Packit Service a2489d
   }
Packit Service a2489d
   // prefer responded nodes
Packit Service a2489d
   if(i>0 && nodes.count()>=K && n->responded) {
Packit Service a2489d
      for(int j=0; j
Packit Service a2489d
	 if(!nodes[j]->responded) {
Packit Service a2489d
	    r->RemoveNode(j);
Packit Service a2489d
	    break;
Packit Service a2489d
	 }
Packit Service a2489d
      }
Packit Service a2489d
   }
Packit Service a2489d
   // remove a non-good and not responded node to free space
Packit Service a2489d
   if(i>0 && nodes.count()>=K) {
Packit Service a2489d
      for(int j=0; j
Packit Service a2489d
	 if(!nodes[j]->IsGood() && !nodes[j]->responded) {
Packit Service a2489d
	    r->RemoveNode(j);
Packit Service a2489d
	    break;
Packit Service a2489d
	 }
Packit Service a2489d
      }
Packit Service a2489d
   }
Packit Service a2489d
Packit Service a2489d
   if(state_io && i==0 && nodes.count()>=K && SplitRoute0())
Packit Service a2489d
      goto try_again;
Packit Service a2489d
Packit Service a2489d
   if(nodes.count()>=K) {
Packit Service a2489d
      int q_num=PingQuestionable(nodes,nodes.count()-K+1);
Packit Service a2489d
      // check if we have already candidates for the questionable nodes
Packit Service a2489d
      if(nodes.count()>=K+q_num) {
Packit Service a2489d
	 if(i==0 && SplitRoute0())
Packit Service a2489d
	    goto try_again;
Packit Service a2489d
	 else
Packit Service a2489d
	    LogNote(9,"skipping node %s, route bucket %d (prefix=%s) has %d nodes",n->GetName(),i,r->to_string(),nodes.count());
Packit Service a2489d
	 return;
Packit Service a2489d
      }
Packit Service a2489d
   }
Packit Service a2489d
   r->SetFresh();
Packit Service a2489d
   LogNote(3,"adding node %s to route bucket %d (prefix=%s)",n->GetName(),i,r->to_string());
Packit Service a2489d
   n->in_routes=true;
Packit Service a2489d
   nodes.append(n);
Packit Service a2489d
}
Packit Service a2489d
bool DHT::SplitRoute0()
Packit Service a2489d
{
Packit Service a2489d
   const Ref<RouteBucket>& r=routes[0];
Packit Service a2489d
   xarray<Node*> &nodes=r->nodes;
Packit Service a2489d
Packit Service a2489d
   if(nodes.count()<K || r->prefix_bits>=160)
Packit Service a2489d
      return false;
Packit Service a2489d
   if(routes.count()>1 && !routes[1]->HasGoodNodes() && !state_io)
Packit Service a2489d
      return false;
Packit Service a2489d
Packit Service a2489d
   LogNote(9,"splitting route bucket 0, nodes=%d",nodes.count());
Packit Service a2489d
   int bits=r->prefix_bits;
Packit Service a2489d
   size_t byte=bits/8;
Packit Service a2489d
   unsigned mask = 1<<(7-bits%8);
Packit Service a2489d
   if(r->prefix.length()<=byte)
Packit Service a2489d
      r->prefix.append('\0');
Packit Service a2489d
   xstring p1(r->prefix.copy());
Packit Service a2489d
   xstring p2(r->prefix.copy());
Packit Service a2489d
   // new bit in p1 is already cleared.
Packit Service a2489d
   p2.get_non_const()[byte]|=mask; // set new bit in p2
Packit Service a2489d
   RouteBucket *b1=new RouteBucket(bits+1,p1);
Packit Service a2489d
   RouteBucket *b2=new RouteBucket(bits+1,p2);
Packit Service a2489d
   // distribute the nodes between two buckets
Packit Service a2489d
   for(int j=0; j
Packit Service a2489d
      if(nodes[j]->id[byte]&mask)
Packit Service a2489d
	 b2->nodes.append(nodes[j]);
Packit Service a2489d
      else
Packit Service a2489d
	 b1->nodes.append(nodes[j]);
Packit Service a2489d
   }
Packit Service a2489d
   if(node_id[byte]&mask) {
Packit Service a2489d
      routes[0]=b2;
Packit Service a2489d
      routes.insert(b1,1);
Packit Service a2489d
   } else {
Packit Service a2489d
      routes[0]=b1;
Packit Service a2489d
      routes.insert(b2,1);
Packit Service a2489d
   }
Packit Service a2489d
   LogNote(9,"new route[0] prefix=%s nodes=%d",routes[0]->to_string(),routes[0]->nodes.count());
Packit Service a2489d
   LogNote(9,"new route[1] prefix=%s nodes=%d",routes[1]->to_string(),routes[1]->nodes.count());
Packit Service a2489d
   assert(routes[0]->PrefixMatch(node_id));
Packit Service a2489d
   return true;
Packit Service a2489d
}
Packit Service a2489d
int DHT::PingQuestionable(const xarray<Node*>& nodes,int limit)
Packit Service a2489d
{
Packit Service a2489d
   int q_num=0;
Packit Service a2489d
   // ping questionable nodes, return the number of questionable nodes
Packit Service a2489d
   for(int j=0; j
Packit Service a2489d
      Node *n=nodes[j];
Packit Service a2489d
      if(!n->IsGood()) {
Packit Service a2489d
	 q_num++;
Packit Service a2489d
	 if(n->ping_timer.Stopped())
Packit Service a2489d
	    SendPing(n);
Packit Service a2489d
      }
Packit Service a2489d
   }
Packit Service a2489d
   return q_num;
Packit Service a2489d
}
Packit Service a2489d
void DHT::RouteBucket::RemoveNode(Node *n)
Packit Service a2489d
{
Packit Service a2489d
   for(int j=0; j
Packit Service a2489d
      if(nodes[j]==n) {
Packit Service a2489d
	 RemoveNode(j);
Packit Service a2489d
	 break;
Packit Service a2489d
      }
Packit Service a2489d
   }
Packit Service a2489d
}
Packit Service a2489d
void DHT::RouteBucket::RemoveNode(int i)
Packit Service a2489d
{
Packit Service a2489d
   assert(i>=0 && i
Packit Service a2489d
   nodes[i]->in_routes=false;
Packit Service a2489d
   nodes.remove(i);
Packit Service a2489d
}
Packit Service a2489d
bool DHT::RouteBucket::PrefixMatch(const xstring& id,int skew) const
Packit Service a2489d
{
Packit Service a2489d
   assert(skew>=0);
Packit Service a2489d
   int prefix_bits=this->prefix_bits-skew;
Packit Service a2489d
   if(prefix_bits<=0)
Packit Service a2489d
      return true;
Packit Service a2489d
   int bytes=prefix_bits/8;
Packit Service a2489d
   int bits=prefix_bits%8;
Packit Service a2489d
   unsigned mask=~((1<<(8-bits))-1);
Packit Service a2489d
   if(bytes>0 && memcmp(prefix.get(),id.get(),bytes))
Packit Service a2489d
      return false;
Packit Service a2489d
   if(bits>0 && (prefix[bytes]&mask)!=(id[bytes]&mask))
Packit Service a2489d
      return false;
Packit Service a2489d
   return true;
Packit Service a2489d
}
Packit Service a2489d
const char *DHT::RouteBucket::to_string() const
Packit Service a2489d
{
Packit Service a2489d
   xstring &buf=xstring::get_tmp("",0);
Packit Service a2489d
   prefix.hexdump_to(buf);
Packit Service a2489d
   buf.truncate((prefix_bits+3)/4);
Packit Service a2489d
   buf.append('/');
Packit Service a2489d
   buf.appendf("%d",prefix_bits);
Packit Service a2489d
   return buf;
Packit Service a2489d
}
Packit Service a2489d
Packit Service a2489d
void DHT::FindNodes(const xstring& target_id,xarray<Node*> &a,int max_count,bool only_good,const xmap<bool> *exclude)
Packit Service a2489d
{
Packit Service a2489d
   a.truncate();
Packit Service a2489d
   for(int skew=0; skew<160; skew++) {
Packit Service a2489d
      int i=FindRoute(target_id,0,skew);
Packit Service a2489d
      if(i<0)
Packit Service a2489d
	 continue;
Packit Service a2489d
      const xarray<Node*> &nodes=routes[i]->nodes;
Packit Service a2489d
      for(int j=0; j
Packit Service a2489d
	 if(!nodes[j]->IsBad() && (!only_good || nodes[j]->IsGood())
Packit Service a2489d
	 && nodes[j]->ping_lost_count<2 && a.search(nodes[j])==-1
Packit Service a2489d
	 && (!exclude || !exclude->exists(nodes[j]->id))) {
Packit Service a2489d
	    a.append(nodes[j]);
Packit Service a2489d
	    if(a.count()>=max_count)
Packit Service a2489d
	       return;
Packit Service a2489d
	 }
Packit Service a2489d
      }
Packit Service a2489d
   }
Packit Service a2489d
}
Packit Service a2489d
void DHT::AddPeer(const xstring& info_hash,const sockaddr_compact& a,bool seed)
Packit Service a2489d
{
Packit Service a2489d
   KnownTorrent *t=torrents.lookup(info_hash);
Packit Service a2489d
   if(!t) {
Packit Service a2489d
      if(torrents.count()>=MAX_TORRENTS) {
Packit Service a2489d
	 // remove random torrent
Packit Service a2489d
	 int r=random()/13%torrents.count();
Packit Service a2489d
	 int i=0;
Packit Service a2489d
	 for(torrents.each_begin(); ; torrents.each_next(), i++) {
Packit Service a2489d
	    if(i==r) {
Packit Service a2489d
	       torrents.remove(torrents.each_key());
Packit Service a2489d
	       break;
Packit Service a2489d
	    }
Packit Service a2489d
	 }
Packit Service a2489d
      }
Packit Service a2489d
      torrents.add(info_hash,t=new KnownTorrent());
Packit Service a2489d
   }
Packit Service a2489d
   t->AddPeer(new Peer(a,seed));
Packit Service a2489d
Packit Service a2489d
   sockaddr_u addr(a);
Packit Service a2489d
   LogNote(9,"added peer %s to torrent %s",addr.to_string(),info_hash.hexdump());
Packit Service a2489d
}
Packit Service a2489d
void DHT::KnownTorrent::AddPeer(Peer *peer)
Packit Service a2489d
{
Packit Service a2489d
   for(int i=0; i
Packit Service a2489d
      if(peers[i]->compact_addr.eq(peer->compact_addr)) {
Packit Service a2489d
	 peers.remove(i);
Packit Service a2489d
	 break;
Packit Service a2489d
      }
Packit Service a2489d
   }
Packit Service a2489d
   if(peers.count()>=MAX_PEERS)
Packit Service a2489d
      peers.remove(0);
Packit Service a2489d
   peers.append(peer);
Packit Service a2489d
}
Packit Service a2489d
Packit Service a2489d
void DHT::MakeNodeId(xstring &id,const sockaddr_compact& ip,int r)
Packit Service a2489d
{
Packit Service a2489d
   // http://www.rasterbar.com/products/libtorrent/dht_sec.html
Packit Service a2489d
   static char v4mask[] = { 0x01, 0x07, 0x1f, 0x7f };
Packit Service a2489d
   static char v6mask[] = { 0x00, 0x01, 0x03, 0x07, 0x0f, 0x1f, 0x3f, 0x7f };
Packit Service a2489d
Packit Service a2489d
   char *mask=(ip.length()==4?v4mask:v6mask);
Packit Service a2489d
   int len=(ip.length()==4?sizeof(v4mask):sizeof(v6mask));
Packit Service a2489d
   int i;
Packit Service a2489d
Packit Service a2489d
   xstring seed;
Packit Service a2489d
   for(i=0; i
Packit Service a2489d
      seed.append(char(ip[i] & mask[i]));
Packit Service a2489d
Packit Service a2489d
   seed.append(char(r&7));
Packit Service a2489d
Packit Service a2489d
   Torrent::SHA1(seed,id);
Packit Service a2489d
Packit Service a2489d
   for(i=4; i<19; i++)
Packit Service a2489d
      id.get_non_const()[i] = random()/13;
Packit Service a2489d
   id.get_non_const()[19] = r;
Packit Service a2489d
}
Packit Service a2489d
bool DHT::ValidNodeId(const xstring& id,const sockaddr_compact& ip)
Packit Service a2489d
{
Packit Service a2489d
   if(id.length()!=20)
Packit Service a2489d
      return false;
Packit Service a2489d
Packit Service a2489d
   sockaddr_u addr(ip);
Packit Service a2489d
   if(!addr.family())
Packit Service a2489d
      return false;
Packit Service a2489d
Packit Service a2489d
   if(addr.is_loopback() || addr.is_private())
Packit Service a2489d
      return true;
Packit Service a2489d
Packit Service a2489d
   xstring id1;
Packit Service a2489d
   MakeNodeId(id1,ip,id[19]);
Packit Service a2489d
   return !memcmp(id,id1,4);
Packit Service a2489d
}
Packit Service a2489d
void DHT::Restart()
Packit Service a2489d
{
Packit Service a2489d
   ip_voted.empty();
Packit Service a2489d
   ip_votes.empty();
Packit Service a2489d
   routes.truncate();
Packit Service a2489d
   // re-add known nodes to route buckets
Packit Service a2489d
   for(Node *n=nodes.each_begin(); n; n=nodes.each_next()) {
Packit Service a2489d
      if(n->IsGood())
Packit Service a2489d
	 AddRoute(n);
Packit Service a2489d
   }
Packit Service a2489d
}
Packit Service a2489d
void DHT::Save(const SMTaskRef<IOBuffer>& buf)
Packit Service a2489d
{
Packit Service a2489d
   Enter(this);
Packit Service a2489d
   xmap_p<BeNode> state;
Packit Service a2489d
   state.add("id",new BeNode(node_id));
Packit Service a2489d
   int count=0;
Packit Service a2489d
   int responded_count=0;
Packit Service a2489d
   xstring compact_nodes;
Packit Service a2489d
   for(Node *n=nodes.each_begin(); n; n=nodes.each_next()) {
Packit Service a2489d
      if(n->IsGood() || n->in_routes) {
Packit Service a2489d
	 compact_nodes.append(n->id);
Packit Service a2489d
	 compact_nodes.append(n->addr.compact());
Packit Service a2489d
	 count++;
Packit Service a2489d
	 responded_count+=n->responded;
Packit Service a2489d
      }
Packit Service a2489d
   }
Packit Service a2489d
   LogNote(9,"saving state, %d nodes (%d responded)",count,responded_count);
Packit Service a2489d
   if(compact_nodes)
Packit Service a2489d
      state.add("nodes",new BeNode(compact_nodes));
Packit Service a2489d
   BeNode(&state).Pack(buf);
Packit Service a2489d
   for(int i=0; i
Packit Service a2489d
      const RouteBucket *r=routes[i];
Packit Service a2489d
      LogNote(9,"route bucket %d: nodes count=%d prefix=%s",i,
Packit Service a2489d
	 (int)r->nodes.count(),r->to_string());
Packit Service a2489d
   }
Packit Service a2489d
   Leave();
Packit Service a2489d
}
Packit Service a2489d
void DHT::Load(const SMTaskRef<IOBuffer>& buf)
Packit Service a2489d
{
Packit Service a2489d
   int rest;
Packit Service a2489d
   Ref<BeNode> state(BeNode::Parse(buf->Get(),buf->Size(),&rest));
Packit Service a2489d
   if(!state || state->type!=BeNode::BE_DICT)
Packit Service a2489d
      return;
Packit Service a2489d
   const xstring& b_id=state->lookup_str("id");
Packit Service a2489d
   if(b_id.length()==20) {
Packit Service a2489d
      node_id.set(b_id);
Packit Service a2489d
      Restart();
Packit Service a2489d
   }
Packit Service a2489d
   const xstring& nodes=state->lookup_str("nodes");
Packit Service a2489d
   if(!nodes)
Packit Service a2489d
      return;
Packit Service a2489d
   const char *data=nodes;
Packit Service a2489d
   int len=nodes.length();
Packit Service a2489d
   const int addr_len=(af==AF_INET?6:18);
Packit Service a2489d
   const int node_len=20+addr_len;
Packit Service a2489d
   while(len>=node_len) {
Packit Service a2489d
      xstring id(data,20);
Packit Service a2489d
      sockaddr_u a;
Packit Service a2489d
      a.set_compact(data+20,addr_len);
Packit Service a2489d
      data+=node_len;
Packit Service a2489d
      len-=node_len;
Packit Service a2489d
      FoundNode(id,a,false);
Packit Service a2489d
   }
Packit Service a2489d
   // refresh routes after loading
Packit Service a2489d
   for(int i=0; i
Packit Service a2489d
      routes[i]->fresh_timer.StopDelayed(i*15+3);
Packit Service a2489d
}
Packit Service a2489d
void DHT::Save()
Packit Service a2489d
{
Packit Service a2489d
   if(!state_file)
Packit Service a2489d
      return;
Packit Service a2489d
   FileStream *f=new FileStream(state_file,O_WRONLY|O_TRUNC|O_CREAT);
Packit Service a2489d
   f->set_lock();
Packit Service a2489d
   f->set_create_mode(0600);
Packit Service a2489d
   f->dont_keep_backup();
Packit Service a2489d
   state_io=new IOBufferFDStream(f,IOBuffer::PUT);
Packit Service a2489d
   Save(state_io);
Packit Service a2489d
   state_io->PutEOF();
Packit Service a2489d
}
Packit Service a2489d
void DHT::Load()
Packit Service a2489d
{
Packit Service a2489d
   if(!state_file)
Packit Service a2489d
      return;
Packit Service a2489d
   FileStream *f=new FileStream(state_file,O_RDONLY);
Packit Service a2489d
   f->set_lock();
Packit Service a2489d
   state_io=new IOBufferFDStream(f,IOBuffer::GET);
Packit Service a2489d
   state_io->Roll();
Packit Service a2489d
   Roll();
Packit Service a2489d
}
Packit Service a2489d
Packit Service a2489d
void DHT::Reconfig(const char *name)
Packit Service a2489d
{
Packit Service a2489d
   rate_limit.Reconfig(name,"DHT");
Packit Service a2489d
}
Packit Service a2489d
Packit Service a2489d
bool DHT::BlackList::Listed(const sockaddr_u& addr)
Packit Service a2489d
{
Packit Service a2489d
   const xstring &key=addr.to_xstring();
Packit Service a2489d
   Timer *e=bl.lookup(key);
Packit Service a2489d
   if(!e)
Packit Service a2489d
      return false;
Packit Service a2489d
   if(e->Stopped()) {
Packit Service a2489d
      LogNote(4,"black-delisting node %s\n",key.get());
Packit Service a2489d
      bl.remove(key);
Packit Service a2489d
      return false;
Packit Service a2489d
   }
Packit Service a2489d
   return true;
Packit Service a2489d
}
Packit Service a2489d
void DHT::BlackList::Add(const sockaddr_u &a,const char *t)
Packit Service a2489d
{
Packit Service a2489d
   if(Listed(a))
Packit Service a2489d
      return;
Packit Service a2489d
   LogNote(4,"black-listing node %s (%s)\n",a.to_string(),t);
Packit Service a2489d
   bl.add(a.to_xstring(),new Timer(TimeIntervalR(t)));
Packit Service a2489d
}