Blame src/DHT.cc

Packit 8f70b4
/*
Packit 8f70b4
 * lftp - file transfer program
Packit 8f70b4
 *
Packit 8f70b4
 * Copyright (c) 2012-2016 by Alexander V. Lukyanov (lav@yars.free.net)
Packit 8f70b4
 *
Packit 8f70b4
 * This program is free software; you can redistribute it and/or modify
Packit 8f70b4
 * it under the terms of the GNU General Public License as published by
Packit 8f70b4
 * the Free Software Foundation; either version 3 of the License, or
Packit 8f70b4
 * (at your option) any later version.
Packit 8f70b4
 *
Packit 8f70b4
 * This program is distributed in the hope that it will be useful,
Packit 8f70b4
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
Packit 8f70b4
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
Packit 8f70b4
 * GNU General Public License for more details.
Packit 8f70b4
 *
Packit 8f70b4
 * You should have received a copy of the GNU General Public License
Packit 8f70b4
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
Packit 8f70b4
 */
Packit 8f70b4
Packit 8f70b4
#include <config.h>
Packit 8f70b4
#include <stdlib.h>
Packit 8f70b4
#include <assert.h>
Packit 8f70b4
#include <sys/types.h>
Packit 8f70b4
#include <sys/stat.h>
Packit 8f70b4
#include <sys/socket.h>
Packit 8f70b4
#include <netinet/in.h>
Packit 8f70b4
#include <arpa/inet.h>
Packit 8f70b4
#include <unistd.h>
Packit 8f70b4
#include <fcntl.h>
Packit 8f70b4
#include <errno.h>
Packit 8f70b4
#include <sha1.h>
Packit 8f70b4
Packit 8f70b4
#include "Torrent.h"
Packit 8f70b4
#include "DHT.h"
Packit 8f70b4
#include "log.h"
Packit 8f70b4
#include "url.h"
Packit 8f70b4
#include "misc.h"
Packit 8f70b4
#include "plural.h"
Packit 8f70b4
Packit 8f70b4
DHT::DHT(int af,const xstring& id)
Packit 8f70b4
   : af(af), rate_limit("DHT"),
Packit 8f70b4
     sent_req_expire_scan(5), search_cleanup_timer(5),
Packit 8f70b4
     refresh_timer(1), nodes_cleanup_timer(30), save_timer(300),
Packit 8f70b4
     node_id(id.copy()), t(random())
Packit 8f70b4
{
Packit 8f70b4
   LogNote(10,"creating DHT with id=%s",node_id.hexdump());
Packit 8f70b4
   Reconfig(0);
Packit 8f70b4
}
Packit 8f70b4
DHT::~DHT()
Packit 8f70b4
{
Packit 8f70b4
}
Packit 8f70b4
void DHT::Bootstrap()
Packit 8f70b4
{
Packit 8f70b4
   // run bootstrap search
Packit 8f70b4
   LogNote(9,"bootstrapping");
Packit 8f70b4
   Search *s=new Search(node_id);
Packit 8f70b4
   s->Bootstrap();
Packit 8f70b4
   StartSearch(s);
Packit 8f70b4
}
Packit 8f70b4
int DHT::Do()
Packit 8f70b4
{
Packit 8f70b4
   int m=STALL;
Packit 8f70b4
   if(state_io) {
Packit 8f70b4
      if(state_io->GetDirection()==IOBuffer::GET) {
Packit 8f70b4
	 if(state_io->Error()) {
Packit 8f70b4
	    LogError(1,"loading state: %s",state_io->ErrorText());
Packit 8f70b4
	    state_io=0;
Packit 8f70b4
	    m=MOVED;
Packit 8f70b4
	 } else if(state_io->Eof()) {
Packit 8f70b4
	    Load(state_io);
Packit 8f70b4
	    state_io=0;
Packit 8f70b4
	    m=MOVED;
Packit 8f70b4
	 }
Packit 8f70b4
      } else {
Packit 8f70b4
	 if(state_io->Error())
Packit 8f70b4
	    LogError(1,"saving state: %s",state_io->ErrorText());
Packit 8f70b4
	 if(state_io->Done()) {
Packit 8f70b4
	    state_io=0;
Packit 8f70b4
	    m=MOVED;
Packit 8f70b4
	 }
Packit 8f70b4
      }
Packit 8f70b4
   }
Packit 8f70b4
   if(sent_req_expire_scan.Stopped()) {
Packit 8f70b4
      for(const Request *r=sent_req.each_begin(); r; r=sent_req.each_next()) {
Packit 8f70b4
	 if(!r->Expired())
Packit 8f70b4
	    continue;
Packit 8f70b4
	 Ref<Request> rr(sent_req.borrow(sent_req.each_key()));
Packit 8f70b4
	 LogError(4,"DHT request %s to %s timed out",r->data->lookup_str("q").get(),r->addr.to_string());
Packit 8f70b4
	 Node *n=nodes.lookup(r->GetNodeId());
Packit 8f70b4
	 if(n) {
Packit 8f70b4
	    n->LostPing();
Packit 8f70b4
	    LogNote(4,"DHT node %s has lost %d packets",n->GetName(),n->ping_lost_count);
Packit 8f70b4
	 }
Packit 8f70b4
	 const xstring& target=r->GetSearchTarget();
Packit 8f70b4
	 if(target) {
Packit 8f70b4
	    Search *s=search.lookup(target);
Packit 8f70b4
	    // if we have lost a search request and have not received any
Packit 8f70b4
	    // reply yet - try to restart the search with good nodes.
Packit 8f70b4
	    if(s && !s->best_node_id)
Packit 8f70b4
	       RestartSearch(s);
Packit 8f70b4
	 }
Packit 8f70b4
      }
Packit 8f70b4
      sent_req_expire_scan.Reset();
Packit 8f70b4
   }
Packit 8f70b4
   if(search_cleanup_timer.Stopped()) {
Packit 8f70b4
      for(Search *s=search.each_begin(); s; s=search.each_next()) {
Packit 8f70b4
	 if(s->search_timer.Stopped())
Packit 8f70b4
	    search.remove(search.each_key());
Packit 8f70b4
      }
Packit 8f70b4
      search_cleanup_timer.Reset();
Packit 8f70b4
   }
Packit 8f70b4
   if(nodes_cleanup_timer.Stopped()) {
Packit 8f70b4
      for(Node *n=nodes.each_begin(); n; n=nodes.each_next()) {
Packit 8f70b4
	 if(n->IsBad()) {
Packit 8f70b4
	    LogNote(9,"removing bad node %s",n->GetName());
Packit 8f70b4
	    RemoveNode(n);
Packit 8f70b4
	 }
Packit 8f70b4
      }
Packit 8f70b4
      if(nodes.count()>MAX_NODES) {
Packit 8f70b4
	 // remove some nodes.
Packit 8f70b4
	 int to_remove=nodes.count()-MAX_NODES;
Packit 8f70b4
	 for(Node *n=nodes.each_begin(); n && to_remove>0; n=nodes.each_next()) {
Packit 8f70b4
	    if(!n->IsGood() && !n->in_routes) {
Packit 8f70b4
	       LogNote(9,"removing node %s (not good)",n->GetName());
Packit 8f70b4
	       RemoveNode(n);
Packit 8f70b4
	       to_remove--;
Packit 8f70b4
	    }
Packit 8f70b4
	 }
Packit 8f70b4
	 for(Node *n=nodes.each_begin(); n && to_remove>0; n=nodes.each_next()) {
Packit 8f70b4
	    if(!n->in_routes && !n->responded) {
Packit 8f70b4
	       LogNote(9,"removing node %s (never responded)",n->GetName());
Packit 8f70b4
	       RemoveNode(n);
Packit 8f70b4
	       to_remove--;
Packit 8f70b4
	    }
Packit 8f70b4
	 }
Packit 8f70b4
	 LogNote(9,"node count=%d",nodes.count());
Packit 8f70b4
      }
Packit 8f70b4
      for(int i=1; i
Packit 8f70b4
	 if(routes[i]->nodes.count()>K) {
Packit 8f70b4
	    xarray<Node*>& nodes=routes[i]->nodes;
Packit 8f70b4
	    int q_num=PingQuestionable(nodes,nodes.count()-K);
Packit 8f70b4
	    if(nodes.count()>K+q_num)
Packit 8f70b4
	       routes[i]->RemoveNode(K); // too many candidates, trim one
Packit 8f70b4
	 }
Packit 8f70b4
      }
Packit 8f70b4
      // remove bad peers
Packit 8f70b4
      for(KnownTorrent *t=torrents.each_begin(); t; t=torrents.each_next()) {
Packit 8f70b4
	 xarray_p<Peer>& p=t->peers;
Packit 8f70b4
	 int seeds=0;
Packit 8f70b4
	 for(int i=0; i
Packit 8f70b4
	    if(!p[i]->IsGood())
Packit 8f70b4
	       p.remove(i--);
Packit 8f70b4
	    else
Packit 8f70b4
	       seeds+=p[i]->seed;
Packit 8f70b4
	 }
Packit 8f70b4
	 LogNote(9,"torrent %s has %d known peers (%d seeds)",
Packit 8f70b4
	    torrents.each_key().hexdump(),p.count(),seeds);
Packit 8f70b4
	 if(p.count()==0)
Packit 8f70b4
	    torrents.remove(torrents.each_key());
Packit 8f70b4
      }
Packit 8f70b4
      nodes_cleanup_timer.Reset();
Packit 8f70b4
      if(save_timer.Stopped()) {
Packit 8f70b4
	 Save();
Packit 8f70b4
	 save_timer.Reset();
Packit 8f70b4
      }
Packit 8f70b4
      if(nodes.count()>0 && routes[0]->nodes.count()<2 && search.count()==0)
Packit 8f70b4
	 Bootstrap(); // lost almost all nodes, try to bootstrap again
Packit 8f70b4
   }
Packit 8f70b4
   if(refresh_timer.Stopped()) {
Packit 8f70b4
      for(int i=0; i
Packit 8f70b4
	 if(!routes[i]->IsFresh()) {
Packit 8f70b4
	    LogNote(9,"refreshing route bucket %d (prefix=%s)",i,routes[i]->to_string());
Packit 8f70b4
	    // make random id in the range
Packit 8f70b4
	    int bytes=routes[i]->prefix_bits/8;
Packit 8f70b4
	    int bits=routes[i]->prefix_bits%8;
Packit 8f70b4
	    xstring random_id(routes[i]->prefix.get(),bytes+(bits>0));
Packit 8f70b4
	    if(bits>0) {
Packit 8f70b4
	       unsigned mask=(1<<(8-bits))-1;
Packit 8f70b4
	       assert(!(random_id[bytes]&mask));
Packit 8f70b4
	       random_id.get_non_const()[bytes]|=(random()/13)&mas;;
Packit 8f70b4
	    }
Packit 8f70b4
	    while(random_id.length()<20)
Packit 8f70b4
	       random_id.append(char(random()/13));
Packit 8f70b4
	    StartSearch(new Search(random_id));
Packit 8f70b4
	    routes[i]->fresh_timer.Reset();
Packit 8f70b4
	 }
Packit 8f70b4
      }
Packit 8f70b4
      refresh_timer.Reset();
Packit 8f70b4
   }
Packit 8f70b4
Packit 8f70b4
   // manual bootstrapping
Packit 8f70b4
   if(resolver) {
Packit 8f70b4
      if(resolver->Error()) {
Packit 8f70b4
	 LogError(1,"%s",resolver->ErrorMsg());
Packit 8f70b4
	 resolver=0;
Packit 8f70b4
	 m=MOVED;
Packit 8f70b4
      } else if(resolver->Done()) {
Packit 8f70b4
	 const xarray<sockaddr_u>& r=resolver->Result();
Packit 8f70b4
	 for(int i=0; i
Packit 8f70b4
	    Torrent::GetDHT(r[i])->SendPing(r[i]);
Packit 8f70b4
	 resolver=0;
Packit 8f70b4
	 m=MOVED;
Packit 8f70b4
      }
Packit 8f70b4
   }
Packit 8f70b4
   if(!state_io && !resolver && bootstrap_nodes.count()>0) {
Packit 8f70b4
      xstring &b=*bootstrap_nodes.next();
Packit 8f70b4
      ParsedURL u(b);
Packit 8f70b4
      if(u.proto==0 && u.host)
Packit 8f70b4
	 resolver=new Resolver(u.host,u.port,"6881");
Packit 8f70b4
      m=MOVED;
Packit 8f70b4
   }
Packit 8f70b4
Packit 8f70b4
   while(send_queue.count()>0 && MaySendMessage()) {
Packit 8f70b4
      SendMessage(send_queue.next().borrow());
Packit 8f70b4
      m=MOVED;
Packit 8f70b4
   }
Packit 8f70b4
Packit 8f70b4
   return m;
Packit 8f70b4
}
Packit 8f70b4
BeNode *DHT::NewQuery(const char *q,xmap_p<BeNode>& a)
Packit 8f70b4
{
Packit 8f70b4
   xmap_p<BeNode> m;
Packit 8f70b4
   BeNode *n=new BeNode((const char*)&t,sizeof(t));
Packit 8f70b4
   m.add("t",n);
Packit 8f70b4
   t++;
Packit 8f70b4
   m.add("y",new BeNode("q",1));
Packit 8f70b4
   m.add("q",new BeNode(q));
Packit 8f70b4
   a.add("id",new BeNode(node_id));
Packit 8f70b4
   m.add("a",new BeNode(&a);;
Packit 8f70b4
   return new BeNode(&m);
Packit 8f70b4
}
Packit 8f70b4
BeNode *DHT::NewReply(const xstring& t0,xmap_p<BeNode>& r)
Packit 8f70b4
{
Packit 8f70b4
   xmap_p<BeNode> m;
Packit 8f70b4
   m.add("t",new BeNode(t0));
Packit 8f70b4
   m.add("y",new BeNode("r",1));
Packit 8f70b4
   r.add("id",new BeNode(node_id));
Packit 8f70b4
   m.add("r",new BeNode(&r);;
Packit 8f70b4
   return new BeNode(&m);
Packit 8f70b4
}
Packit 8f70b4
BeNode *DHT::NewError(const xstring& t0,int code,const char *msg)
Packit 8f70b4
{
Packit 8f70b4
   xmap_p<BeNode> m;
Packit 8f70b4
   m.add("t",new BeNode(t0));
Packit 8f70b4
   m.add("y",new BeNode("e",1));
Packit 8f70b4
   xarray_p<BeNode> e;
Packit 8f70b4
   e.append(new BeNode(code));
Packit 8f70b4
   e.append(new BeNode(msg));
Packit 8f70b4
   m.add("e",new BeNode(&e);;
Packit 8f70b4
   return new BeNode(&m);
Packit 8f70b4
}
Packit 8f70b4
const char *DHT::MessageType(BeNode *q)
Packit 8f70b4
{
Packit 8f70b4
   const xstring& y=q->lookup_str("y");
Packit 8f70b4
   const char *msg_type="message";
Packit 8f70b4
   if(y.eq("q"))
Packit 8f70b4
      msg_type=q->lookup_str("q");
Packit 8f70b4
   else if(y.eq("r"))
Packit 8f70b4
      msg_type="response";
Packit 8f70b4
   else if(y.eq("e"))
Packit 8f70b4
      msg_type="error";
Packit 8f70b4
   return msg_type;
Packit 8f70b4
}
Packit 8f70b4
void DHT::SendMessage(BeNode *q,const sockaddr_u& a,const xstring& id)
Packit 8f70b4
{
Packit 8f70b4
   if(send_queue.count()>MAX_SEND_QUEUE) {
Packit 8f70b4
      LogError(9,"tail dropping output message");
Packit 8f70b4
      delete q;
Packit 8f70b4
      return;
Packit 8f70b4
   }
Packit 8f70b4
   send_queue.push(new Request(q,a,id));
Packit 8f70b4
}
Packit 8f70b4
void DHT::SendMessage(Request *req)
Packit 8f70b4
{
Packit 8f70b4
   req->expire_timer.Reset();
Packit 8f70b4
   BeNode *q=req->data.get_non_const();
Packit 8f70b4
   const sockaddr_u& a=req->addr;
Packit 8f70b4
   LogSend(4,xstring::format("sending DHT %s to %s %s",MessageType(q),
Packit 8f70b4
      a.to_string(),q->Format1()));
Packit 8f70b4
   int res=-1;
Packit 8f70b4
   res=Torrent::GetUDPSocket(af)->SendUDP(a,q->Pack());
Packit 8f70b4
   if(res!=-1 && q->lookup_str("y").eq("q")) {
Packit 8f70b4
      sent_req.add(q->lookup_str("t"),req);
Packit 8f70b4
      rate_limit.BytesPut(res);
Packit 8f70b4
   } else {
Packit 8f70b4
      delete req;
Packit 8f70b4
   }
Packit 8f70b4
}
Packit 8f70b4
bool DHT::MaySendMessage()
Packit 8f70b4
{
Packit 8f70b4
   return rate_limit.BytesAllowedToPut()>=256
Packit 8f70b4
      && Torrent::GetUDPSocket(af)->MaySendUDP();
Packit 8f70b4
}
Packit 8f70b4
void DHT::SendPing(const sockaddr_u& a,const xstring& id)
Packit 8f70b4
{
Packit 8f70b4
   if(a.port()==0 || a.is_private() || a.is_reserved() || a.is_multicast())
Packit 8f70b4
      return;
Packit 8f70b4
   Enter();
Packit 8f70b4
   xmap_p<BeNode> arg;
Packit 8f70b4
   SendMessage(NewQuery("ping",arg),a,id);
Packit 8f70b4
   Leave();
Packit 8f70b4
}
Packit 8f70b4
void DHT::SendPing(Node *n)
Packit 8f70b4
{
Packit 8f70b4
   SendPing(n->addr,n->id);
Packit 8f70b4
   n->ping_timer.Reset();
Packit 8f70b4
}
Packit 8f70b4
void DHT::AnnouncePeer(const Torrent *t)
Packit 8f70b4
{
Packit 8f70b4
   const xstring& info_hash=t->GetInfoHash();
Packit 8f70b4
   // check for duplicated announce
Packit 8f70b4
   if(search.exists(info_hash))
Packit 8f70b4
      return;
Packit 8f70b4
   Enter();
Packit 8f70b4
   Search *s=new Search(info_hash);
Packit 8f70b4
   s->WantPeers(t->Complete());
Packit 8f70b4
#if INET6
Packit 8f70b4
   // try to find nodes in the other AF if needed
Packit 8f70b4
   if(Torrent::GetDHT(af==AF_INET?AF_INET6:AF_INET)->nodes.count()<1)
Packit 8f70b4
      s->Bootstrap();
Packit 8f70b4
#endif
Packit 8f70b4
   StartSearch(s);
Packit 8f70b4
   Leave();
Packit 8f70b4
}
Packit 8f70b4
void DHT::DenouncePeer(const Torrent *t)
Packit 8f70b4
{
Packit 8f70b4
   // remove any search for this torrent
Packit 8f70b4
   search.remove(t->GetInfoHash());
Packit 8f70b4
}
Packit 8f70b4
int DHT::AddNodesToReply(xmap_p<BeNode> &r,const xstring& target,int max_count)
Packit 8f70b4
{
Packit 8f70b4
   xarray<Node*> n;
Packit 8f70b4
   FindNodes(target,n,max_count,true);
Packit 8f70b4
   xstring compact_nodes;
Packit 8f70b4
   for(int i=0; i
Packit 8f70b4
      compact_nodes.append(n[i]->id);
Packit 8f70b4
      compact_nodes.append(n[i]->addr.compact());
Packit 8f70b4
   }
Packit 8f70b4
   r.add(af==AF_INET?"nodes":"nodes6",new BeNode(compact_nodes));
Packit 8f70b4
   return n.count();
Packit 8f70b4
}
Packit 8f70b4
int DHT::AddNodesToReply(xmap_p<BeNode> &r,const xstring& target,bool want_n4,bool want_n6)
Packit 8f70b4
{
Packit 8f70b4
   int nodes_count=0;
Packit 8f70b4
   if(want_n4)
Packit 8f70b4
      nodes_count+=Torrent::GetDHT(AF_INET)->AddNodesToReply(r,target,K);
Packit 8f70b4
   if(want_n6)
Packit 8f70b4
      nodes_count+=Torrent::GetDHT(AF_INET6)->AddNodesToReply(r,target,K);
Packit 8f70b4
   return nodes_count;
Packit 8f70b4
}
Packit 8f70b4
const xstring& DHT::Node::GetToken()
Packit 8f70b4
{
Packit 8f70b4
   if(!my_token || token_timer.Stopped()) {
Packit 8f70b4
      // make new token
Packit 8f70b4
      my_last_token.set(my_token);
Packit 8f70b4
      my_token.truncate();
Packit 8f70b4
      for(int i=0; i<16; i++)
Packit 8f70b4
	 my_token.append(char(random()/13));
Packit 8f70b4
      token_timer.Reset();
Packit 8f70b4
   }
Packit 8f70b4
   return my_token;
Packit 8f70b4
}
Packit 8f70b4
bool DHT::Node::TokenIsValid(const xstring& token) const
Packit 8f70b4
{
Packit 8f70b4
   if(!token || !my_token || token_timer.Stopped())
Packit 8f70b4
      return false;
Packit 8f70b4
   return token.eq(my_token) || token.eq(my_last_token);
Packit 8f70b4
}
Packit 8f70b4
const xstring& DHT::Request::GetSearchTarget() const
Packit 8f70b4
{
Packit 8f70b4
   const BeNode *a=data->lookup("a",BeNode::BE_DICT);
Packit 8f70b4
   if(!a)
Packit 8f70b4
      return xstring::null;
Packit 8f70b4
   const xstring& q=data->lookup_str("q");
Packit 8f70b4
   const char *target=q.eq("find_node")?"target":"info_hash";
Packit 8f70b4
   return a->lookup_str(target);
Packit 8f70b4
}
Packit 8f70b4
void DHT::HandlePacket(BeNode *p,const sockaddr_u& src)
Packit 8f70b4
{
Packit 8f70b4
   LogRecv(4,xstring::format("received DHT %s from %s %s",MessageType(p),
Packit 8f70b4
      src.to_string(),p->Format1()));
Packit 8f70b4
   int pkt_len=p->str.length();
Packit 8f70b4
   const xstring& t=p->lookup_str("t");
Packit 8f70b4
   if(!t)
Packit 8f70b4
      return;
Packit 8f70b4
   const xstring& y=p->lookup_str("y");
Packit 8f70b4
   if(!y)
Packit 8f70b4
      return;
Packit 8f70b4
   if(y.eq("q")) { // query
Packit 8f70b4
      if(rate_limit.BytesAllowedToGet()
Packit 8f70b4
	 LogError(9,"dropping incoming message (rate limit exceeded)");
Packit 8f70b4
	 return;
Packit 8f70b4
      }
Packit 8f70b4
      rate_limit.BytesGot(pkt_len);
Packit 8f70b4
      const xstring& q=p->lookup_str("q");
Packit 8f70b4
      if(!q)
Packit 8f70b4
	 return;
Packit 8f70b4
      BeNode *a=p->lookup("a",BeNode::BE_DICT);
Packit 8f70b4
      if(!a)
Packit 8f70b4
	 return;
Packit 8f70b4
      const xstring& id=a->lookup_str("id");
Packit 8f70b4
      if(id.length()!=20)
Packit 8f70b4
	 return;
Packit 8f70b4
      Node *node=FoundNode(id,src,false);
Packit 8f70b4
      if(!node)
Packit 8f70b4
	 return;
Packit 8f70b4
Packit 8f70b4
      xmap_p<BeNode> r;
Packit 8f70b4
      if(src.family()==AF_INET)
Packit 8f70b4
	 r.add("ip",new BeNode(src.compact_addr()));
Packit 8f70b4
Packit 8f70b4
      bool want_n4=false;
Packit 8f70b4
      bool want_n6=false;
Packit 8f70b4
      BeNode *want=a->lookup("want",BeNode::BE_LIST);
Packit 8f70b4
      if(want) {
Packit 8f70b4
	 for(int i=0; i<want->list.count(); i++) {
Packit 8f70b4
	    BeNode *w=want->list[i];
Packit 8f70b4
	    if(w->type!=BeNode::BE_STR)
Packit 8f70b4
	       continue;
Packit 8f70b4
	    if(w->str.eq("n4"))
Packit 8f70b4
	       want_n4=true;
Packit 8f70b4
	    if(w->str.eq("n6"))
Packit 8f70b4
	       want_n6=true;
Packit 8f70b4
	 }
Packit 8f70b4
      }
Packit 8f70b4
      if(!want_n4 && !want_n6) {
Packit 8f70b4
	 want_n4=(src.family()==AF_INET);
Packit 8f70b4
	 want_n6=(src.family()==AF_INET6);
Packit 8f70b4
      }
Packit 8f70b4
Packit 8f70b4
      if(q.eq("ping")) {
Packit 8f70b4
	 LogSend(5,xstring::format("DHT ping reply to %s",src.to_string()));
Packit 8f70b4
	 SendMessage(NewReply(t,r),src);
Packit 8f70b4
      } else if(q.eq("find_node")) {
Packit 8f70b4
	 const xstring& target=a->lookup_str("target");
Packit 8f70b4
	 if(!target)
Packit 8f70b4
	    return;
Packit 8f70b4
	 int nodes_count=AddNodesToReply(r,target,want_n4,want_n6);
Packit 8f70b4
	 LogSend(5,xstring::format("DHT find_node reply with %d nodes to %s",nodes_count,src.to_string()));
Packit 8f70b4
	 SendMessage(NewReply(t,r),src);
Packit 8f70b4
      } else if(q.eq("get_peers")) {
Packit 8f70b4
	 const xstring& info_hash=a->lookup_str("info_hash");
Packit 8f70b4
	 if(info_hash.length()!=20)
Packit 8f70b4
	    return;
Packit 8f70b4
	 bool noseed=a->lookup_int("noseed");
Packit 8f70b4
	 KnownTorrent *torrent=torrents.lookup(info_hash);
Packit 8f70b4
	 int nodes_count=0;
Packit 8f70b4
	 int values_count=0;
Packit 8f70b4
	 if(!torrent || torrent->peers.count()==0) {
Packit 8f70b4
	    nodes_count=AddNodesToReply(r,info_hash,want_n4,want_n6);
Packit 8f70b4
	 } else {
Packit 8f70b4
	    xarray_p<Peer>& p=torrent->peers;
Packit 8f70b4
	    xarray_p<BeNode> values;
Packit 8f70b4
	    for(int i=0; i
Packit 8f70b4
	       Peer *peer=p[i];
Packit 8f70b4
	       if(noseed && peer->seed)
Packit 8f70b4
		  continue;
Packit 8f70b4
	       if(!peer->IsGood())
Packit 8f70b4
		  continue;
Packit 8f70b4
	       if(peer->compact_addr.family()==AF_INET && !want_n4)
Packit 8f70b4
		  continue;
Packit 8f70b4
#if INET6
Packit 8f70b4
	       if(peer->compact_addr.family()==AF_INET6 && !want_n6)
Packit 8f70b4
		  continue;
Packit 8f70b4
#endif
Packit 8f70b4
	       values.append(new BeNode(peer->compact_addr));
Packit 8f70b4
	       values_count++;
Packit 8f70b4
	    }
Packit 8f70b4
	    if(values_count>0)
Packit 8f70b4
	       r.add("values",new BeNode(&values));
Packit 8f70b4
	    else
Packit 8f70b4
	       nodes_count=AddNodesToReply(r,info_hash,want_n4,want_n6);
Packit 8f70b4
	 }
Packit 8f70b4
	 r.add("token",new BeNode(node->GetToken()));
Packit 8f70b4
	 LogSend(5,xstring::format("DHT get_peers reply with %d values and %d nodes to %s",
Packit 8f70b4
	    values_count,nodes_count,src.to_string()));
Packit 8f70b4
	 SendMessage(NewReply(t,r),src);
Packit 8f70b4
      } else if(q.eq("announce_peer")) {
Packit 8f70b4
	 // need a valid token
Packit 8f70b4
	 if(!node->TokenIsValid(a->lookup_str("token"))) {
Packit 8f70b4
	    SendMessage(NewError(t,ERR_PROTOCOL,"invalid token"),src);
Packit 8f70b4
	    return;
Packit 8f70b4
	 }
Packit 8f70b4
	 // ok, token is valid. Now add the peer.
Packit 8f70b4
	 const xstring& info_hash=a->lookup_str("info_hash");
Packit 8f70b4
	 if(info_hash.length()!=20)
Packit 8f70b4
	    return;
Packit 8f70b4
	 int port=a->lookup_int("port");
Packit 8f70b4
	 if(!port)
Packit 8f70b4
	    return;
Packit 8f70b4
	 bool seed=a->lookup_int("seed");
Packit 8f70b4
	 sockaddr_u peer_addr(src);
Packit 8f70b4
	 peer_addr.set_port(port);
Packit 8f70b4
	 AddPeer(info_hash,peer_addr.compact(),seed);
Packit 8f70b4
	 SendMessage(NewReply(t,r),src);
Packit 8f70b4
      } else if(q.eq("vote")) {
Packit 8f70b4
#if 0
Packit 8f70b4
	 // need a valid token
Packit 8f70b4
	 if(!node->TokenIsValid(a->lookup_str("token"))) {
Packit 8f70b4
	    SendMessage(NewError(t,ERR_PROTOCOL,"invalid token"),src);
Packit 8f70b4
	    return;
Packit 8f70b4
	 }
Packit 8f70b4
	 // target is sha1(info_hash+"rating")
Packit 8f70b4
	 const xstring& target=a->lookup_str("target");
Packit 8f70b4
	 if(target.length()!=20)
Packit 8f70b4
            return;
Packit 8f70b4
         unsigned vote=a->lookup_int("vote");
Packit 8f70b4
	 // store the vote
Packit 8f70b4
	 // return what?
Packit 8f70b4
	 SendMessage(NewReply(t,r),src);
Packit 8f70b4
#endif
Packit 8f70b4
      } else {
Packit 8f70b4
	 SendMessage(NewError(t,ERR_UNKNOWN_METHOD,"method unknown"),src);
Packit 8f70b4
      }
Packit 8f70b4
      return;
Packit 8f70b4
   }
Packit 8f70b4
Packit 8f70b4
   Ref<Request> req(sent_req.borrow(t));
Packit 8f70b4
   if(!req) {
Packit 8f70b4
      LogError(2,"got DHT reply with unknown `t' from %s",src.to_string());
Packit 8f70b4
      return;
Packit 8f70b4
   }
Packit 8f70b4
   if(req->addr!=src) {
Packit 8f70b4
      LogError(2,"got DHT reply from %s instead of %s",src.to_string(),req->addr.to_string());
Packit 8f70b4
      return;
Packit 8f70b4
   }
Packit 8f70b4
Packit 8f70b4
   const xstring& q=req->data->lookup_str("q");
Packit 8f70b4
   if(y.eq("r")) { // reply
Packit 8f70b4
      BeNode *r=p->lookup("r",BeNode::BE_DICT);
Packit 8f70b4
      if(!r)
Packit 8f70b4
	 return;
Packit 8f70b4
      const xstring& id=r->lookup_str("id");
Packit 8f70b4
      if(id.length()!=20)
Packit 8f70b4
	 return;
Packit 8f70b4
Packit 8f70b4
      Node *node=FoundNode(id,src,true);
Packit 8f70b4
      if(!node)
Packit 8f70b4
	 return;
Packit 8f70b4
Packit 8f70b4
      const sockaddr_compact& ip=sockaddr_compact::cast(r->lookup_str("ip"));
Packit 8f70b4
      if(ip && !ValidNodeId(node_id,ip)) {
Packit 8f70b4
	 const xstring &src_ip=xstring::get_tmp(src.address());
Packit 8f70b4
	 if(src_ip.eq(ip.address())) {
Packit 8f70b4
	    LogError(2,"%s incorrectly reported our IP as %s",src.to_string(),ip.address());
Packit 8f70b4
	    black_list.Add(src,"1d");
Packit 8f70b4
	    return;
Packit 8f70b4
	 } else if(!ip_voted.lookup(src_ip)) {
Packit 8f70b4
	    ip_voted.add(src_ip,true);
Packit 8f70b4
	    unsigned& votes=ip_votes.lookup_Lv(ip);
Packit 8f70b4
	    votes++;
Packit 8f70b4
	    LogNote(2,"%s reported our IP as %s (votes=%u)",src.to_string(),ip.address(),votes);
Packit 8f70b4
	    if(votes>=4) {
Packit 8f70b4
	       // we have incorrect node_id, restart with correct one.
Packit 8f70b4
	       MakeNodeId(node_id,ip);
Packit 8f70b4
	       LogNote(0,"restarting DHT with new id %s",node_id.hexdump());
Packit 8f70b4
	       Restart();
Packit 8f70b4
	    }
Packit 8f70b4
	 }
Packit 8f70b4
      }
Packit 8f70b4
Packit 8f70b4
      if(q.eq("get_peers")) {
Packit 8f70b4
	 const xstring& info_hash=req->GetSearchTarget();
Packit 8f70b4
	 Torrent *torrent=Torrent::FindTorrent(info_hash);
Packit 8f70b4
	 BeNode *values=r->lookup("values",BeNode::BE_LIST);
Packit 8f70b4
	 if(values) {
Packit 8f70b4
	    // some peers found.
Packit 8f70b4
	    for(int i=0; i<values->list.count(); i++) {
Packit 8f70b4
	       if(values->list[i]->type!=BeNode::BE_STR)
Packit 8f70b4
		  continue;
Packit 8f70b4
	       const sockaddr_compact &c=sockaddr_compact::cast(values->list[i]->str);
Packit 8f70b4
	       sockaddr_u a(c);
Packit 8f70b4
	       if(!a.port())
Packit 8f70b4
		  continue;
Packit 8f70b4
	       LogNote(9,"found peer %s for info_hash=%s",a.to_string(),info_hash.hexdump());
Packit 8f70b4
	       if(torrent)
Packit 8f70b4
		  torrent->AddPeer(new TorrentPeer(torrent,&a,TorrentPeer::TR_DHT));
Packit 8f70b4
	    }
Packit 8f70b4
	 }
Packit 8f70b4
	 const xstring& token=r->lookup_str("token");
Packit 8f70b4
	 if(token && torrent) {
Packit 8f70b4
	    if(!ValidNodeId(id,src.compact_addr()))
Packit 8f70b4
	       LogError(2,"warning: node id %s is invalid for %s",id.hexdump(),src.address());
Packit 8f70b4
	    // announce the torrent
Packit 8f70b4
	    int port=torrent->GetPortIPv4();
Packit 8f70b4
#if INET6
Packit 8f70b4
	    if(src.family()==AF_INET6)
Packit 8f70b4
	       port=torrent->GetPortIPv6();
Packit 8f70b4
#endif
Packit 8f70b4
	    xmap_p<BeNode> a;
Packit 8f70b4
	    a.add("info_hash",new BeNode(info_hash));
Packit 8f70b4
	    a.add("port",new BeNode(port));
Packit 8f70b4
	    a.add("token",new BeNode(token));
Packit 8f70b4
	    if(torrent->Complete())
Packit 8f70b4
	       a.add("seed",new BeNode(1));
Packit 8f70b4
	    SendMessage(NewQuery("announce_peer",a),src,id);
Packit 8f70b4
	    torrent->DHT_Announced(src.family());
Packit 8f70b4
	 }
Packit 8f70b4
      }
Packit 8f70b4
      if(q.eq("find_node") || q.eq("get_peers")) {
Packit 8f70b4
	 const xstring& target=req->GetSearchTarget();
Packit 8f70b4
	 const xstring &node_id=req->GetNodeId();
Packit 8f70b4
	 LogNote(9,"got reply for %s with target %s from node %s",q.get(),target.hexdump(),node_id.hexdump());
Packit 8f70b4
Packit 8f70b4
	 Search *s=search.lookup(target);
Packit 8f70b4
	 if(s && s->IsFeasible(node_id)) {
Packit 8f70b4
	    s->best_node_id.set(node_id);
Packit 8f70b4
	    s->depth++;
Packit 8f70b4
	    LogNote(9,"search for %s goes to depth=%d with best_node_id=%s",
Packit 8f70b4
	       s->target_id.hexdump(),s->depth,node_id.hexdump());
Packit 8f70b4
	 }
Packit 8f70b4
Packit 8f70b4
	 const xstring& nodes=r->lookup_str("nodes");
Packit 8f70b4
	 if(nodes) {
Packit 8f70b4
	    LogNote(9,"adding %d nodes",(int)nodes.length()/26);
Packit 8f70b4
	    const char *data=nodes;
Packit 8f70b4
	    int len=nodes.length();
Packit 8f70b4
	    while(len>=26) {
Packit 8f70b4
	       xstring id(data,20);
Packit 8f70b4
	       sockaddr_u a;
Packit 8f70b4
	       a.set_compact(data+20,6);
Packit 8f70b4
	       data+=26;
Packit 8f70b4
	       len-=26;
Packit 8f70b4
	       Node *new_node=FoundNode(id,a,false,s);
Packit 8f70b4
	       if(new_node)
Packit 8f70b4
		  new_node->SetOrigin(node);
Packit 8f70b4
	    }
Packit 8f70b4
	 }
Packit 8f70b4
#if INET6
Packit 8f70b4
	 const xstring& nodes6=r->lookup_str("nodes6");
Packit 8f70b4
	 if(nodes6) {
Packit 8f70b4
	    LogNote(9,"adding %d nodes6",(int)nodes6.length()/38);
Packit 8f70b4
	    const char *data=nodes6;
Packit 8f70b4
	    int len=nodes6.length();
Packit 8f70b4
	    while(len>=38) {
Packit 8f70b4
	       xstring id(data,20);
Packit 8f70b4
	       sockaddr_u a;
Packit 8f70b4
	       a.set_compact(data+20,18);
Packit 8f70b4
	       data+=38;
Packit 8f70b4
	       len-=38;
Packit 8f70b4
	       Node *new_node=FoundNode(id,a,false,s);
Packit 8f70b4
	       if(new_node)
Packit 8f70b4
		  new_node->SetOrigin(node);
Packit 8f70b4
	    }
Packit 8f70b4
	 }
Packit 8f70b4
#endif //INET6
Packit 8f70b4
      }
Packit 8f70b4
   } else if(y.eq("e")) { // error
Packit 8f70b4
      int code=0;
Packit 8f70b4
      const char *msg="unknown";
Packit 8f70b4
      BeNode *e=p->lookup("e",BeNode::BE_LIST);
Packit 8f70b4
      if(e) {
Packit 8f70b4
	 if(e->list.count()>=1 && e->list[0]->type==BeNode::BE_INT)
Packit 8f70b4
	    code=e->list[0]->num;
Packit 8f70b4
	 if(e->list.count()>=2 && e->list[1]->type==BeNode::BE_STR)
Packit 8f70b4
	    msg=e->list[1]->str;
Packit 8f70b4
      }
Packit 8f70b4
      LogError(2,"got DHT error for %s (%d: %s) from %s",q.get(),code,msg,src.to_string());
Packit 8f70b4
   }
Packit 8f70b4
}
Packit 8f70b4
Packit 8f70b4
bool DHT::Search::IsFeasible(const xstring &id) const
Packit 8f70b4
{
Packit 8f70b4
   if(!best_node_id)
Packit 8f70b4
      return true;
Packit 8f70b4
   for(int i=0; i<20; i++) {
Packit 8f70b4
      unsigned char a=id[i]^target_id[i];
Packit 8f70b4
      unsigned char b=best_node_id[i]^target_id[i];
Packit 8f70b4
      if(a
Packit 8f70b4
	 return true;
Packit 8f70b4
      if(a>b)
Packit 8f70b4
	 return false;
Packit 8f70b4
   }
Packit 8f70b4
   return false;
Packit 8f70b4
}
Packit 8f70b4
Packit 8f70b4
void DHT::Search::ContinueOn(DHT *d,const Node *n)
Packit 8f70b4
{
Packit 8f70b4
   if(searched.exists(n->id)) {
Packit 8f70b4
      LogNote(9,"skipping search on %s, already searched",n->GetName());
Packit 8f70b4
      return;
Packit 8f70b4
   }
Packit 8f70b4
   LogNote(3,"search for %s continues on %s (%s) depth=%d",
Packit 8f70b4
      target_id.hexdump(),n->id.hexdump(),n->GetName(),depth);
Packit 8f70b4
Packit 8f70b4
   xmap_p<BeNode> a;
Packit 8f70b4
#if INET6
Packit 8f70b4
   if(bootstrap) {
Packit 8f70b4
	 xarray_p<BeNode> want;
Packit 8f70b4
	 want.append(new BeNode("n4"));
Packit 8f70b4
	 want.append(new BeNode("n6"));
Packit 8f70b4
	 a.add("want",new BeNode(&want));
Packit 8f70b4
   }
Packit 8f70b4
#endif
Packit 8f70b4
   if(!want_peers) {
Packit 8f70b4
      a.add("target",new BeNode(target_id));
Packit 8f70b4
      d->SendMessage(d->NewQuery("find_node",a),n->addr,n->id);
Packit 8f70b4
   } else {
Packit 8f70b4
      a.add("info_hash",new BeNode(target_id));
Packit 8f70b4
      if(noseed)
Packit 8f70b4
	 a.add("noseed",new BeNode(1));
Packit 8f70b4
      d->SendMessage(d->NewQuery("get_peers",a),n->addr,n->id);
Packit 8f70b4
   }
Packit 8f70b4
   searched.add(n->id,true);
Packit 8f70b4
   search_timer.Reset();
Packit 8f70b4
}
Packit 8f70b4
Packit 8f70b4
void DHT::StartSearch(Search *s)
Packit 8f70b4
{
Packit 8f70b4
   LogNote(9,"starting search for %s",s->target_id.hexdump());
Packit 8f70b4
   xarray<Node*> n;
Packit 8f70b4
   FindNodes(s->target_id,n,K,true);
Packit 8f70b4
   if(n.count()<=K/2) {
Packit 8f70b4
      LogNote(2,"too few good nodes found in the routing table");
Packit 8f70b4
      FindNodes(s->target_id,n,K,false);
Packit 8f70b4
      if(n.count()==0)
Packit 8f70b4
	 LogError(1,"no nodes found in the routing table");
Packit 8f70b4
   }
Packit 8f70b4
   for(int i=0; i
Packit 8f70b4
      s->ContinueOn(this,n[i]);
Packit 8f70b4
   search.add(s->target_id,s);
Packit 8f70b4
}
Packit 8f70b4
Packit 8f70b4
void DHT::RestartSearch(Search *s)
Packit 8f70b4
{
Packit 8f70b4
   xarray<Node*> n;
Packit 8f70b4
   FindNodes(s->target_id,n,K,true,&s->searched);
Packit 8f70b4
   for(int i=0; i
Packit 8f70b4
      s->ContinueOn(this,n[i]);
Packit 8f70b4
}
Packit 8f70b4
Packit 8f70b4
DHT::Node *DHT::GetOrigin(const Node *n) {
Packit 8f70b4
   if(!n->origin_id)
Packit 8f70b4
      return 0;
Packit 8f70b4
   Node *origin=nodes.lookup(n->origin_id);
Packit 8f70b4
   if(!origin || origin==n)
Packit 8f70b4
      return 0;
Packit 8f70b4
   return origin;
Packit 8f70b4
}
Packit 8f70b4
Packit 8f70b4
DHT::Node *DHT::FoundNode(const xstring& id,const sockaddr_u& a,bool responded,Search *s)
Packit 8f70b4
{
Packit 8f70b4
   if(a.port()==0 || a.is_private() || a.is_reserved() || a.is_multicast()) {
Packit 8f70b4
      LogError(9,"node address %s is not valid",a.to_string());
Packit 8f70b4
      return 0;
Packit 8f70b4
   }
Packit 8f70b4
Packit 8f70b4
   if(a.family()!=af)
Packit 8f70b4
      return 0;
Packit 8f70b4
Packit 8f70b4
   if(id.eq(node_id)) {
Packit 8f70b4
      LogNote(9,"node %s has our own id",a.to_string());
Packit 8f70b4
      return 0;
Packit 8f70b4
   }
Packit 8f70b4
Packit 8f70b4
   if(black_list.Listed(a)) {
Packit 8f70b4
      LogNote(9,"node %s is blacklisted",a.to_string());
Packit 8f70b4
      return 0;
Packit 8f70b4
   }
Packit 8f70b4
Packit 8f70b4
   Node *n=nodes.lookup(id);
Packit 8f70b4
   if(!n) {
Packit 8f70b4
      n=node_by_addr.lookup(a.compact());
Packit 8f70b4
      if(n) {
Packit 8f70b4
	 if(!responded) {
Packit 8f70b4
	    // change node id only by a message from the node.
Packit 8f70b4
	    return 0;
Packit 8f70b4
	 }
Packit 8f70b4
	 if(n->id_change_count>0) {
Packit 8f70b4
	    LogError(9,"%s changes node id again",n->addr.to_string());
Packit 8f70b4
	    BlackListNode(n,"1d");
Packit 8f70b4
	    return 0;
Packit 8f70b4
	 }
Packit 8f70b4
	 ChangeNodeId(n,id);
Packit 8f70b4
      } else {
Packit 8f70b4
	 n=new Node(id,a);
Packit 8f70b4
	 AddNode(n);
Packit 8f70b4
      }
Packit 8f70b4
   } else {
Packit 8f70b4
      AddRoute(n);
Packit 8f70b4
   }
Packit 8f70b4
Packit 8f70b4
   if(responded) {
Packit 8f70b4
      n->responded=true;
Packit 8f70b4
      n->ResetLostPing();
Packit 8f70b4
      Node *origin=GetOrigin(n);
Packit 8f70b4
      if(origin)
Packit 8f70b4
	 origin->bad_node_count/=2;
Packit 8f70b4
   }
Packit 8f70b4
   if(n->responded)
Packit 8f70b4
      n->SetGood();
Packit 8f70b4
Packit 8f70b4
   // continue search
Packit 8f70b4
   if(s && s->IsFeasible(n))
Packit 8f70b4
      s->ContinueOn(this,n);
Packit 8f70b4
Packit 8f70b4
   return n;
Packit 8f70b4
}
Packit 8f70b4
void DHT::RemoveNode(Node *n)
Packit 8f70b4
{
Packit 8f70b4
   Node *origin=GetOrigin(n);
Packit 8f70b4
   if(origin && !n->responded && n->ping_lost_count>1) {
Packit 8f70b4
      origin->bad_node_count++;
Packit 8f70b4
      if(origin->bad_node_count>2*K)
Packit 8f70b4
	 BlackListNode(origin,"1h");
Packit 8f70b4
   }
Packit 8f70b4
   RemoveRoute(n);
Packit 8f70b4
   node_by_addr.remove(n->addr.compact());
Packit 8f70b4
   nodes.remove(n->id);
Packit 8f70b4
}
Packit 8f70b4
void DHT::ChangeNodeId(Node *n,const xstring& new_node_id)
Packit 8f70b4
{
Packit 8f70b4
   LogNote(1,"node_id changed for %s, old_node_id=%s, new_node_id=%s",
Packit 8f70b4
      n->GetName(),n->id.hexdump(),new_node_id.hexdump());
Packit 8f70b4
   n->id_change_count++;
Packit 8f70b4
Packit 8f70b4
   // change node_id in the in-flight requests
Packit 8f70b4
   for(Request *r=sent_req.each_begin(); r; r=sent_req.each_next()) {
Packit 8f70b4
      if(r->node_id.eq(n->id) && r->addr==n->addr)
Packit 8f70b4
	 r->node_id.set(new_node_id);
Packit 8f70b4
   }
Packit 8f70b4
Packit 8f70b4
   RemoveRoute(n);
Packit 8f70b4
   nodes.borrow(n->id);	// borrow to avoid freeing the node
Packit 8f70b4
   n->id.set(new_node_id);
Packit 8f70b4
   nodes.add(n->id,n);
Packit 8f70b4
   AddRoute(n);
Packit 8f70b4
}
Packit 8f70b4
void DHT::BlackListNode(Node *n,const char *timeout)
Packit 8f70b4
{
Packit 8f70b4
   black_list.Add(n->addr,timeout);
Packit 8f70b4
   for(int i=0; i
Packit 8f70b4
      if(send_queue[i]->node_id.eq(n->id))
Packit 8f70b4
	 send_queue.remove(i);
Packit 8f70b4
   }
Packit 8f70b4
   for(const Request *r=sent_req.each_begin(); r; r=sent_req.each_next()) {
Packit 8f70b4
      if(r->GetNodeId().eq(n->id))
Packit 8f70b4
	 sent_req.remove(sent_req.each_key());
Packit 8f70b4
   }
Packit 8f70b4
   RemoveNode(n);
Packit 8f70b4
}
Packit 8f70b4
Packit 8f70b4
void DHT::AddNode(Node *n)
Packit 8f70b4
{
Packit 8f70b4
   assert(n->id.length()==20);
Packit 8f70b4
   assert(!nodes.exists(n->id));
Packit 8f70b4
   assert(!node_by_addr.exists(n->addr.compact()));
Packit 8f70b4
Packit 8f70b4
   nodes.add(n->id,n);
Packit 8f70b4
   node_by_addr.add(n->addr.compact(),n);
Packit 8f70b4
Packit 8f70b4
   AddRoute(n);
Packit 8f70b4
Packit 8f70b4
   if(nodes.count()==1 && search.count()==0 && !state_io)
Packit 8f70b4
      Bootstrap();
Packit 8f70b4
}
Packit 8f70b4
int DHT::FindRoute(const xstring& id,int i,int skew)
Packit 8f70b4
{
Packit 8f70b4
   // routes are ordered by prefix length decreasing
Packit 8f70b4
   // the first route bucket always matches our node_id
Packit 8f70b4
   for( ; i
Packit 8f70b4
      if(routes[i]->PrefixMatch(id,skew)) {
Packit 8f70b4
	 return i;
Packit 8f70b4
      }
Packit 8f70b4
   }
Packit 8f70b4
   return -1;
Packit 8f70b4
}
Packit 8f70b4
void DHT::RemoveRoute(Node *n)
Packit 8f70b4
{
Packit 8f70b4
   int i=FindRoute(n->id);
Packit 8f70b4
   if(i==-1)
Packit 8f70b4
      return;
Packit 8f70b4
   routes[i]->RemoveNode(n);
Packit 8f70b4
}
Packit 8f70b4
void DHT::AddRoute(Node *n)
Packit 8f70b4
{
Packit 8f70b4
try_again:
Packit 8f70b4
   int i=FindRoute(n->id);
Packit 8f70b4
   if(i==-1) {
Packit 8f70b4
      assert(routes.count()==0);
Packit 8f70b4
      routes.append(new RouteBucket(0,xstring::null));
Packit 8f70b4
      i=0;
Packit 8f70b4
   }
Packit 8f70b4
   const Ref<RouteBucket>& r=routes[i];
Packit 8f70b4
   xarray<Node*> &nodes=r->nodes;
Packit 8f70b4
   // check if the node is already in the bucket
Packit 8f70b4
   for(int j=0; j
Packit 8f70b4
      if(nodes[j]==n) {
Packit 8f70b4
	 // K nodes are stable, other nodes are candidates
Packit 8f70b4
	 if(j
Packit 8f70b4
	    // move the stable node to the end of K set
Packit 8f70b4
	    // and set the bucket as fresh.
Packit 8f70b4
	    r->SetFresh();
Packit 8f70b4
	    nodes.remove(j);
Packit 8f70b4
	    if(nodes.count()
Packit 8f70b4
	       nodes.append(n);
Packit 8f70b4
	    else
Packit 8f70b4
	       nodes.insert(n,K-1);
Packit 8f70b4
	    // remove a candidate
Packit 8f70b4
	 }
Packit 8f70b4
	 return;
Packit 8f70b4
      }
Packit 8f70b4
   }
Packit 8f70b4
   // remove a bad node to free space
Packit 8f70b4
   if(nodes.count()>=K) {
Packit 8f70b4
      for(int j=0; j
Packit 8f70b4
	 if(nodes[j]->IsBad()) {
Packit 8f70b4
	    r->RemoveNode(j);
Packit 8f70b4
	    break;
Packit 8f70b4
	 }
Packit 8f70b4
      }
Packit 8f70b4
   }
Packit 8f70b4
   // prefer responded nodes
Packit 8f70b4
   if(i>0 && nodes.count()>=K && n->responded) {
Packit 8f70b4
      for(int j=0; j
Packit 8f70b4
	 if(!nodes[j]->responded) {
Packit 8f70b4
	    r->RemoveNode(j);
Packit 8f70b4
	    break;
Packit 8f70b4
	 }
Packit 8f70b4
      }
Packit 8f70b4
   }
Packit 8f70b4
   // remove a non-good and not responded node to free space
Packit 8f70b4
   if(i>0 && nodes.count()>=K) {
Packit 8f70b4
      for(int j=0; j
Packit 8f70b4
	 if(!nodes[j]->IsGood() && !nodes[j]->responded) {
Packit 8f70b4
	    r->RemoveNode(j);
Packit 8f70b4
	    break;
Packit 8f70b4
	 }
Packit 8f70b4
      }
Packit 8f70b4
   }
Packit 8f70b4
Packit 8f70b4
   if(state_io && i==0 && nodes.count()>=K && SplitRoute0())
Packit 8f70b4
      goto try_again;
Packit 8f70b4
Packit 8f70b4
   if(nodes.count()>=K) {
Packit 8f70b4
      int q_num=PingQuestionable(nodes,nodes.count()-K+1);
Packit 8f70b4
      // check if we have already candidates for the questionable nodes
Packit 8f70b4
      if(nodes.count()>=K+q_num) {
Packit 8f70b4
	 if(i==0 && SplitRoute0())
Packit 8f70b4
	    goto try_again;
Packit 8f70b4
	 else
Packit 8f70b4
	    LogNote(9,"skipping node %s, route bucket %d (prefix=%s) has %d nodes",n->GetName(),i,r->to_string(),nodes.count());
Packit 8f70b4
	 return;
Packit 8f70b4
      }
Packit 8f70b4
   }
Packit 8f70b4
   r->SetFresh();
Packit 8f70b4
   LogNote(3,"adding node %s to route bucket %d (prefix=%s)",n->GetName(),i,r->to_string());
Packit 8f70b4
   n->in_routes=true;
Packit 8f70b4
   nodes.append(n);
Packit 8f70b4
}
Packit 8f70b4
bool DHT::SplitRoute0()
Packit 8f70b4
{
Packit 8f70b4
   const Ref<RouteBucket>& r=routes[0];
Packit 8f70b4
   xarray<Node*> &nodes=r->nodes;
Packit 8f70b4
Packit 8f70b4
   if(nodes.count()<K || r->prefix_bits>=160)
Packit 8f70b4
      return false;
Packit 8f70b4
   if(routes.count()>1 && !routes[1]->HasGoodNodes() && !state_io)
Packit 8f70b4
      return false;
Packit 8f70b4
Packit 8f70b4
   LogNote(9,"splitting route bucket 0, nodes=%d",nodes.count());
Packit 8f70b4
   int bits=r->prefix_bits;
Packit 8f70b4
   size_t byte=bits/8;
Packit 8f70b4
   unsigned mask = 1<<(7-bits%8);
Packit 8f70b4
   if(r->prefix.length()<=byte)
Packit 8f70b4
      r->prefix.append('\0');
Packit 8f70b4
   xstring p1(r->prefix.copy());
Packit 8f70b4
   xstring p2(r->prefix.copy());
Packit 8f70b4
   // new bit in p1 is already cleared.
Packit 8f70b4
   p2.get_non_const()[byte]|=mask; // set new bit in p2
Packit 8f70b4
   RouteBucket *b1=new RouteBucket(bits+1,p1);
Packit 8f70b4
   RouteBucket *b2=new RouteBucket(bits+1,p2);
Packit 8f70b4
   // distribute the nodes between two buckets
Packit 8f70b4
   for(int j=0; j
Packit 8f70b4
      if(nodes[j]->id[byte]&mask)
Packit 8f70b4
	 b2->nodes.append(nodes[j]);
Packit 8f70b4
      else
Packit 8f70b4
	 b1->nodes.append(nodes[j]);
Packit 8f70b4
   }
Packit 8f70b4
   if(node_id[byte]&mask) {
Packit 8f70b4
      routes[0]=b2;
Packit 8f70b4
      routes.insert(b1,1);
Packit 8f70b4
   } else {
Packit 8f70b4
      routes[0]=b1;
Packit 8f70b4
      routes.insert(b2,1);
Packit 8f70b4
   }
Packit 8f70b4
   LogNote(9,"new route[0] prefix=%s nodes=%d",routes[0]->to_string(),routes[0]->nodes.count());
Packit 8f70b4
   LogNote(9,"new route[1] prefix=%s nodes=%d",routes[1]->to_string(),routes[1]->nodes.count());
Packit 8f70b4
   assert(routes[0]->PrefixMatch(node_id));
Packit 8f70b4
   return true;
Packit 8f70b4
}
Packit 8f70b4
int DHT::PingQuestionable(const xarray<Node*>& nodes,int limit)
Packit 8f70b4
{
Packit 8f70b4
   int q_num=0;
Packit 8f70b4
   // ping questionable nodes, return the number of questionable nodes
Packit 8f70b4
   for(int j=0; j
Packit 8f70b4
      Node *n=nodes[j];
Packit 8f70b4
      if(!n->IsGood()) {
Packit 8f70b4
	 q_num++;
Packit 8f70b4
	 if(n->ping_timer.Stopped())
Packit 8f70b4
	    SendPing(n);
Packit 8f70b4
      }
Packit 8f70b4
   }
Packit 8f70b4
   return q_num;
Packit 8f70b4
}
Packit 8f70b4
void DHT::RouteBucket::RemoveNode(Node *n)
Packit 8f70b4
{
Packit 8f70b4
   for(int j=0; j
Packit 8f70b4
      if(nodes[j]==n) {
Packit 8f70b4
	 RemoveNode(j);
Packit 8f70b4
	 break;
Packit 8f70b4
      }
Packit 8f70b4
   }
Packit 8f70b4
}
Packit 8f70b4
void DHT::RouteBucket::RemoveNode(int i)
Packit 8f70b4
{
Packit 8f70b4
   assert(i>=0 && i
Packit 8f70b4
   nodes[i]->in_routes=false;
Packit 8f70b4
   nodes.remove(i);
Packit 8f70b4
}
Packit 8f70b4
bool DHT::RouteBucket::PrefixMatch(const xstring& id,int skew) const
Packit 8f70b4
{
Packit 8f70b4
   assert(skew>=0);
Packit 8f70b4
   int prefix_bits=this->prefix_bits-skew;
Packit 8f70b4
   if(prefix_bits<=0)
Packit 8f70b4
      return true;
Packit 8f70b4
   int bytes=prefix_bits/8;
Packit 8f70b4
   int bits=prefix_bits%8;
Packit 8f70b4
   unsigned mask=~((1<<(8-bits))-1);
Packit 8f70b4
   if(bytes>0 && memcmp(prefix.get(),id.get(),bytes))
Packit 8f70b4
      return false;
Packit 8f70b4
   if(bits>0 && (prefix[bytes]&mask)!=(id[bytes]&mask))
Packit 8f70b4
      return false;
Packit 8f70b4
   return true;
Packit 8f70b4
}
Packit 8f70b4
const char *DHT::RouteBucket::to_string() const
Packit 8f70b4
{
Packit 8f70b4
   xstring &buf=xstring::get_tmp("",0);
Packit 8f70b4
   prefix.hexdump_to(buf);
Packit 8f70b4
   buf.truncate((prefix_bits+3)/4);
Packit 8f70b4
   buf.append('/');
Packit 8f70b4
   buf.appendf("%d",prefix_bits);
Packit 8f70b4
   return buf;
Packit 8f70b4
}
Packit 8f70b4
Packit 8f70b4
void DHT::FindNodes(const xstring& target_id,xarray<Node*> &a,int max_count,bool only_good,const xmap<bool> *exclude)
Packit 8f70b4
{
Packit 8f70b4
   a.truncate();
Packit 8f70b4
   for(int skew=0; skew<160; skew++) {
Packit 8f70b4
      int i=FindRoute(target_id,0,skew);
Packit 8f70b4
      if(i<0)
Packit 8f70b4
	 continue;
Packit 8f70b4
      const xarray<Node*> &nodes=routes[i]->nodes;
Packit 8f70b4
      for(int j=0; j
Packit 8f70b4
	 if(!nodes[j]->IsBad() && (!only_good || nodes[j]->IsGood())
Packit 8f70b4
	 && nodes[j]->ping_lost_count<2 && a.search(nodes[j])==-1
Packit 8f70b4
	 && (!exclude || !exclude->exists(nodes[j]->id))) {
Packit 8f70b4
	    a.append(nodes[j]);
Packit 8f70b4
	    if(a.count()>=max_count)
Packit 8f70b4
	       return;
Packit 8f70b4
	 }
Packit 8f70b4
      }
Packit 8f70b4
   }
Packit 8f70b4
}
Packit 8f70b4
void DHT::AddPeer(const xstring& info_hash,const sockaddr_compact& a,bool seed)
Packit 8f70b4
{
Packit 8f70b4
   KnownTorrent *t=torrents.lookup(info_hash);
Packit 8f70b4
   if(!t) {
Packit 8f70b4
      if(torrents.count()>=MAX_TORRENTS) {
Packit 8f70b4
	 // remove random torrent
Packit 8f70b4
	 int r=random()/13%torrents.count();
Packit 8f70b4
	 int i=0;
Packit 8f70b4
	 for(torrents.each_begin(); ; torrents.each_next(), i++) {
Packit 8f70b4
	    if(i==r) {
Packit 8f70b4
	       torrents.remove(torrents.each_key());
Packit 8f70b4
	       break;
Packit 8f70b4
	    }
Packit 8f70b4
	 }
Packit 8f70b4
      }
Packit 8f70b4
      torrents.add(info_hash,t=new KnownTorrent());
Packit 8f70b4
   }
Packit 8f70b4
   t->AddPeer(new Peer(a,seed));
Packit 8f70b4
Packit 8f70b4
   sockaddr_u addr(a);
Packit 8f70b4
   LogNote(9,"added peer %s to torrent %s",addr.to_string(),info_hash.hexdump());
Packit 8f70b4
}
Packit 8f70b4
void DHT::KnownTorrent::AddPeer(Peer *peer)
Packit 8f70b4
{
Packit 8f70b4
   for(int i=0; i
Packit 8f70b4
      if(peers[i]->compact_addr.eq(peer->compact_addr)) {
Packit 8f70b4
	 peers.remove(i);
Packit 8f70b4
	 break;
Packit 8f70b4
      }
Packit 8f70b4
   }
Packit 8f70b4
   if(peers.count()>=MAX_PEERS)
Packit 8f70b4
      peers.remove(0);
Packit 8f70b4
   peers.append(peer);
Packit 8f70b4
}
Packit 8f70b4
Packit 8f70b4
void DHT::MakeNodeId(xstring &id,const sockaddr_compact& ip,int r)
Packit 8f70b4
{
Packit 8f70b4
   // http://www.rasterbar.com/products/libtorrent/dht_sec.html
Packit 8f70b4
   static char v4mask[] = { 0x01, 0x07, 0x1f, 0x7f };
Packit 8f70b4
   static char v6mask[] = { 0x00, 0x01, 0x03, 0x07, 0x0f, 0x1f, 0x3f, 0x7f };
Packit 8f70b4
Packit 8f70b4
   char *mask=(ip.length()==4?v4mask:v6mask);
Packit 8f70b4
   int len=(ip.length()==4?sizeof(v4mask):sizeof(v6mask));
Packit 8f70b4
   int i;
Packit 8f70b4
Packit 8f70b4
   xstring seed;
Packit 8f70b4
   for(i=0; i
Packit 8f70b4
      seed.append(char(ip[i] & mask[i]));
Packit 8f70b4
Packit 8f70b4
   seed.append(char(r&7));
Packit 8f70b4
Packit 8f70b4
   Torrent::SHA1(seed,id);
Packit 8f70b4
Packit 8f70b4
   for(i=4; i<19; i++)
Packit 8f70b4
      id.get_non_const()[i] = random()/13;
Packit 8f70b4
   id.get_non_const()[19] = r;
Packit 8f70b4
}
Packit 8f70b4
bool DHT::ValidNodeId(const xstring& id,const sockaddr_compact& ip)
Packit 8f70b4
{
Packit 8f70b4
   if(id.length()!=20)
Packit 8f70b4
      return false;
Packit 8f70b4
Packit 8f70b4
   sockaddr_u addr(ip);
Packit 8f70b4
   if(!addr.family())
Packit 8f70b4
      return false;
Packit 8f70b4
Packit 8f70b4
   if(addr.is_loopback() || addr.is_private())
Packit 8f70b4
      return true;
Packit 8f70b4
Packit 8f70b4
   xstring id1;
Packit 8f70b4
   MakeNodeId(id1,ip,id[19]);
Packit 8f70b4
   return !memcmp(id,id1,4);
Packit 8f70b4
}
Packit 8f70b4
void DHT::Restart()
Packit 8f70b4
{
Packit 8f70b4
   ip_voted.empty();
Packit 8f70b4
   ip_votes.empty();
Packit 8f70b4
   routes.truncate();
Packit 8f70b4
   // re-add known nodes to route buckets
Packit 8f70b4
   for(Node *n=nodes.each_begin(); n; n=nodes.each_next()) {
Packit 8f70b4
      if(n->IsGood())
Packit 8f70b4
	 AddRoute(n);
Packit 8f70b4
   }
Packit 8f70b4
}
Packit 8f70b4
void DHT::Save(const SMTaskRef<IOBuffer>& buf)
Packit 8f70b4
{
Packit 8f70b4
   Enter(this);
Packit 8f70b4
   xmap_p<BeNode> state;
Packit 8f70b4
   state.add("id",new BeNode(node_id));
Packit 8f70b4
   int count=0;
Packit 8f70b4
   int responded_count=0;
Packit 8f70b4
   xstring compact_nodes;
Packit 8f70b4
   for(Node *n=nodes.each_begin(); n; n=nodes.each_next()) {
Packit 8f70b4
      if(n->IsGood() || n->in_routes) {
Packit 8f70b4
	 compact_nodes.append(n->id);
Packit 8f70b4
	 compact_nodes.append(n->addr.compact());
Packit 8f70b4
	 count++;
Packit 8f70b4
	 responded_count+=n->responded;
Packit 8f70b4
      }
Packit 8f70b4
   }
Packit 8f70b4
   LogNote(9,"saving state, %d nodes (%d responded)",count,responded_count);
Packit 8f70b4
   if(compact_nodes)
Packit 8f70b4
      state.add("nodes",new BeNode(compact_nodes));
Packit 8f70b4
   BeNode(&state).Pack(buf);
Packit 8f70b4
   for(int i=0; i
Packit 8f70b4
      const RouteBucket *r=routes[i];
Packit 8f70b4
      LogNote(9,"route bucket %d: nodes count=%d prefix=%s",i,
Packit 8f70b4
	 (int)r->nodes.count(),r->to_string());
Packit 8f70b4
   }
Packit 8f70b4
   Leave();
Packit 8f70b4
}
Packit 8f70b4
void DHT::Load(const SMTaskRef<IOBuffer>& buf)
Packit 8f70b4
{
Packit 8f70b4
   int rest;
Packit 8f70b4
   Ref<BeNode> state(BeNode::Parse(buf->Get(),buf->Size(),&rest));
Packit 8f70b4
   if(!state || state->type!=BeNode::BE_DICT)
Packit 8f70b4
      return;
Packit 8f70b4
   const xstring& b_id=state->lookup_str("id");
Packit 8f70b4
   if(b_id.length()==20) {
Packit 8f70b4
      node_id.set(b_id);
Packit 8f70b4
      Restart();
Packit 8f70b4
   }
Packit 8f70b4
   const xstring& nodes=state->lookup_str("nodes");
Packit 8f70b4
   if(!nodes)
Packit 8f70b4
      return;
Packit 8f70b4
   const char *data=nodes;
Packit 8f70b4
   int len=nodes.length();
Packit 8f70b4
   const int addr_len=(af==AF_INET?6:18);
Packit 8f70b4
   const int node_len=20+addr_len;
Packit 8f70b4
   while(len>=node_len) {
Packit 8f70b4
      xstring id(data,20);
Packit 8f70b4
      sockaddr_u a;
Packit 8f70b4
      a.set_compact(data+20,addr_len);
Packit 8f70b4
      data+=node_len;
Packit 8f70b4
      len-=node_len;
Packit 8f70b4
      FoundNode(id,a,false);
Packit 8f70b4
   }
Packit 8f70b4
   // refresh routes after loading
Packit 8f70b4
   for(int i=0; i
Packit 8f70b4
      routes[i]->fresh_timer.StopDelayed(i*15+3);
Packit 8f70b4
}
Packit 8f70b4
void DHT::Save()
Packit 8f70b4
{
Packit 8f70b4
   if(!state_file)
Packit 8f70b4
      return;
Packit 8f70b4
   FileStream *f=new FileStream(state_file,O_WRONLY|O_TRUNC|O_CREAT);
Packit 8f70b4
   f->set_lock();
Packit 8f70b4
   f->set_create_mode(0600);
Packit 8f70b4
   f->dont_keep_backup();
Packit 8f70b4
   state_io=new IOBufferFDStream(f,IOBuffer::PUT);
Packit 8f70b4
   Save(state_io);
Packit 8f70b4
   state_io->PutEOF();
Packit 8f70b4
}
Packit 8f70b4
void DHT::Load()
Packit 8f70b4
{
Packit 8f70b4
   if(!state_file)
Packit 8f70b4
      return;
Packit 8f70b4
   FileStream *f=new FileStream(state_file,O_RDONLY);
Packit 8f70b4
   f->set_lock();
Packit 8f70b4
   state_io=new IOBufferFDStream(f,IOBuffer::GET);
Packit 8f70b4
   state_io->Roll();
Packit 8f70b4
   Roll();
Packit 8f70b4
}
Packit 8f70b4
Packit 8f70b4
void DHT::Reconfig(const char *name)
Packit 8f70b4
{
Packit 8f70b4
   rate_limit.Reconfig(name,"DHT");
Packit 8f70b4
}
Packit 8f70b4
Packit 8f70b4
bool DHT::BlackList::Listed(const sockaddr_u& addr)
Packit 8f70b4
{
Packit 8f70b4
   const xstring &key=addr.to_xstring();
Packit 8f70b4
   Timer *e=bl.lookup(key);
Packit 8f70b4
   if(!e)
Packit 8f70b4
      return false;
Packit 8f70b4
   if(e->Stopped()) {
Packit 8f70b4
      LogNote(4,"black-delisting node %s\n",key.get());
Packit 8f70b4
      bl.remove(key);
Packit 8f70b4
      return false;
Packit 8f70b4
   }
Packit 8f70b4
   return true;
Packit 8f70b4
}
Packit 8f70b4
void DHT::BlackList::Add(const sockaddr_u &a,const char *t)
Packit 8f70b4
{
Packit 8f70b4
   if(Listed(a))
Packit 8f70b4
      return;
Packit 8f70b4
   LogNote(4,"black-listing node %s (%s)\n",a.to_string(),t);
Packit 8f70b4
   bl.add(a.to_xstring(),new Timer(TimeIntervalR(t)));
Packit 8f70b4
}