Blame src/SMTask.cc

Packit 8f70b4
/*
Packit 8f70b4
 * lftp - file transfer program
Packit 8f70b4
 *
Packit 8f70b4
 * Copyright (c) 1996-2016 by Alexander V. Lukyanov (lav@yars.free.net)
Packit 8f70b4
 *
Packit 8f70b4
 * This program is free software; you can redistribute it and/or modify
Packit 8f70b4
 * it under the terms of the GNU General Public License as published by
Packit 8f70b4
 * the Free Software Foundation; either version 3 of the License, or
Packit 8f70b4
 * (at your option) any later version.
Packit 8f70b4
 *
Packit 8f70b4
 * This program is distributed in the hope that it will be useful,
Packit 8f70b4
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
Packit 8f70b4
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
Packit 8f70b4
 * GNU General Public License for more details.
Packit 8f70b4
 *
Packit 8f70b4
 * You should have received a copy of the GNU General Public License
Packit 8f70b4
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
Packit 8f70b4
 */
Packit 8f70b4
Packit 8f70b4
#include <config.h>
Packit 8f70b4
#include <assert.h>
Packit 8f70b4
#include <time.h>
Packit 8f70b4
#include "trio.h"
Packit 8f70b4
#ifdef TIME_WITH_SYS_TIME
Packit 8f70b4
# include <sys/types.h>
Packit 8f70b4
# include <sys/time.h>
Packit 8f70b4
#endif
Packit 8f70b4
Packit 8f70b4
#include "SMTask.h"
Packit 8f70b4
#include "Timer.h"
Packit 8f70b4
#include "misc.h"
Packit 8f70b4
Packit 8f70b4
#ifdef TASK_DEBUG
Packit 8f70b4
# define DEBUG(x) do{printf x;fflush(stdout);}while(0)
Packit 8f70b4
#else
Packit 8f70b4
# define DEBUG(x) do{}while(0)
Packit 8f70b4
#endif
Packit 8f70b4
Packit 8f70b4
xlist_head<SMTask>  SMTask::all_tasks;
Packit 8f70b4
xlist_head<SMTask>  SMTask::ready_tasks;
Packit 8f70b4
xlist_head<SMTask>  SMTask::new_tasks;
Packit 8f70b4
xlist_head<SMTask>  SMTask::deleted_tasks;
Packit 8f70b4
Packit 8f70b4
SMTask	 *SMTask::current;
Packit 8f70b4
Packit 8f70b4
SMTask	 *SMTask::stack[SMTASK_MAX_DEPTH];
Packit 8f70b4
int	 SMTask::stack_ptr;
Packit 8f70b4
Packit 8f70b4
PollVec	 SMTask::block;
Packit 8f70b4
TimeDate SMTask::now;
Packit 8f70b4
time_t	 SMTask::last_block;
Packit 8f70b4
Packit 8f70b4
static SMTask *init_task=new SMTaskInit;
Packit 8f70b4
Packit 8f70b4
SMTask::SMTask()
Packit 8f70b4
 : all_tasks_node(this), ready_tasks_node(this),
Packit 8f70b4
   new_tasks_node(this), deleted_tasks_node(this)
Packit 8f70b4
{
Packit 8f70b4
   // insert in the chain
Packit 8f70b4
   all_tasks.add(all_tasks_node);
Packit 8f70b4
Packit 8f70b4
   suspended=false;
Packit 8f70b4
   suspended_slave=false;
Packit 8f70b4
   running=0;
Packit 8f70b4
   ref_count=0;
Packit 8f70b4
   deleting=false;
Packit 8f70b4
   new_tasks.add(new_tasks_node);
Packit 8f70b4
   DEBUG(("new SMTask %p (count=%d)\n",this,all_tasks.count()));
Packit 8f70b4
}
Packit 8f70b4
Packit 8f70b4
void  SMTask::Suspend()
Packit 8f70b4
{
Packit 8f70b4
   if(suspended)
Packit 8f70b4
      return;
Packit 8f70b4
   DEBUG(("Suspend(%p) from %p\n",this,current));
Packit 8f70b4
   if(!IsSuspended())
Packit 8f70b4
      SuspendInternal();
Packit 8f70b4
   suspended=true;
Packit 8f70b4
}
Packit 8f70b4
void  SMTask::Resume()
Packit 8f70b4
{
Packit 8f70b4
   if(!suspended)
Packit 8f70b4
      return;
Packit 8f70b4
   suspended=false;
Packit 8f70b4
   if(!IsSuspended())
Packit 8f70b4
      ResumeInternal();
Packit 8f70b4
}
Packit 8f70b4
void  SMTask::SuspendSlave()
Packit 8f70b4
{
Packit 8f70b4
   if(suspended_slave)
Packit 8f70b4
      return;
Packit 8f70b4
   DEBUG(("SuspendSlave(%p) from %p\n",this,current));
Packit 8f70b4
   if(!IsSuspended())
Packit 8f70b4
      SuspendInternal();
Packit 8f70b4
   suspended_slave=true;
Packit 8f70b4
}
Packit 8f70b4
void  SMTask::ResumeSlave()
Packit 8f70b4
{
Packit 8f70b4
   if(!suspended_slave)
Packit 8f70b4
      return;
Packit 8f70b4
   suspended_slave=false;
Packit 8f70b4
   if(!IsSuspended())
Packit 8f70b4
      ResumeInternal();
Packit 8f70b4
}
Packit 8f70b4
void SMTask::ResumeInternal()
Packit 8f70b4
{
Packit 8f70b4
   if(!new_tasks_node.listed() && !ready_tasks_node.listed())
Packit 8f70b4
      new_tasks.add_tail(new_tasks_node);
Packit 8f70b4
}
Packit 8f70b4
Packit 8f70b4
SMTask::~SMTask()
Packit 8f70b4
{
Packit 8f70b4
   DEBUG(("delete SMTask %p (count=%d)\n",this,all_tasks.count()));
Packit 8f70b4
   assert(!running);
Packit 8f70b4
   assert(!ref_count);
Packit 8f70b4
   assert(deleting);
Packit 8f70b4
Packit 8f70b4
   if(ready_tasks_node.listed())
Packit 8f70b4
      ready_tasks_node.remove();
Packit 8f70b4
   if(new_tasks_node.listed())
Packit 8f70b4
      new_tasks_node.remove();
Packit 8f70b4
   assert(!deleted_tasks_node.listed());
Packit 8f70b4
Packit 8f70b4
   // remove from the chain
Packit 8f70b4
   all_tasks_node.remove();
Packit 8f70b4
}
Packit 8f70b4
Packit 8f70b4
void SMTask::DeleteLater()
Packit 8f70b4
{
Packit 8f70b4
   if(deleting)
Packit 8f70b4
      return;
Packit 8f70b4
   deleting=true;
Packit 8f70b4
   deleted_tasks.add_tail(deleted_tasks_node);
Packit 8f70b4
   PrepareToDie();
Packit 8f70b4
}
Packit 8f70b4
void SMTask::Delete(SMTask *task)
Packit 8f70b4
{
Packit 8f70b4
   if(!task)
Packit 8f70b4
      return;
Packit 8f70b4
   task->DeleteLater();
Packit 8f70b4
   // CollectGarbage will delete the task gracefully
Packit 8f70b4
}
Packit 8f70b4
SMTask *SMTask::_SetRef(SMTask *task,SMTask *new_task)
Packit 8f70b4
{
Packit 8f70b4
   _DeleteRef(task);
Packit 8f70b4
   _MakeRef(new_task);
Packit 8f70b4
   return new_task;
Packit 8f70b4
}
Packit 8f70b4
Packit 8f70b4
void SMTask::Enter(SMTask *task)
Packit 8f70b4
{
Packit 8f70b4
   assert(stack_ptr
Packit 8f70b4
   stack[stack_ptr++]=current;
Packit 8f70b4
   current=task;
Packit 8f70b4
   current->running++;
Packit 8f70b4
}
Packit 8f70b4
void SMTask::Leave(SMTask *task)
Packit 8f70b4
{
Packit 8f70b4
   assert(current==task);
Packit 8f70b4
   current->running--;
Packit 8f70b4
   assert(stack_ptr>0);
Packit 8f70b4
   current=stack[--stack_ptr];
Packit 8f70b4
}
Packit 8f70b4
Packit 8f70b4
int SMTask::Roll(SMTask *task)
Packit 8f70b4
{
Packit 8f70b4
   int m=STALL;
Packit 8f70b4
   if(task->running || task->deleting)
Packit 8f70b4
      return m;
Packit 8f70b4
   Enter(task);
Packit 8f70b4
   while(!task->deleting && task->Do()==MOVED)
Packit 8f70b4
      m=MOVED;
Packit 8f70b4
   Leave(task);
Packit 8f70b4
   return m;
Packit 8f70b4
}
Packit 8f70b4
Packit 8f70b4
void SMTask::RollAll(const TimeInterval &max_time)
Packit 8f70b4
{
Packit 8f70b4
   Timer limit_timer(max_time);
Packit 8f70b4
   do { Schedule(); }
Packit 8f70b4
   while(block.WillNotBlock() && !limit_timer.Stopped());
Packit 8f70b4
}
Packit 8f70b4
Packit 8f70b4
int SMTask::CollectGarbage()
Packit 8f70b4
{
Packit 8f70b4
   int count=0;
Packit 8f70b4
   xlist_for_each_safe(SMTask,deleted_tasks,node,task,next)
Packit 8f70b4
   {
Packit 8f70b4
      if(task->running || task->ref_count)
Packit 8f70b4
	 continue;
Packit 8f70b4
      node->remove();
Packit 8f70b4
      delete task;
Packit 8f70b4
      count++;
Packit 8f70b4
   }
Packit 8f70b4
   return count;
Packit 8f70b4
}
Packit 8f70b4
Packit 8f70b4
int SMTask::ScheduleThis()
Packit 8f70b4
{
Packit 8f70b4
   assert(ready_tasks_node.listed());
Packit 8f70b4
   if(running)
Packit 8f70b4
      return STALL;
Packit 8f70b4
   if(deleting || IsSuspended())
Packit 8f70b4
   {
Packit 8f70b4
      ready_tasks_node.remove();
Packit 8f70b4
      return STALL;
Packit 8f70b4
   }
Packit 8f70b4
   Enter();	   // mark it current and running.
Packit 8f70b4
   int res=Do();   // let it run.
Packit 8f70b4
   Leave();	   // unmark it running and change current.
Packit 8f70b4
   return res;
Packit 8f70b4
}
Packit 8f70b4
Packit 8f70b4
int SMTask::ScheduleNew()
Packit 8f70b4
{
Packit 8f70b4
   int res=STALL;
Packit 8f70b4
   xlist_for_each_safe(SMTask,new_tasks,node,task,next)
Packit 8f70b4
   {
Packit 8f70b4
      task->new_tasks_node.remove();
Packit 8f70b4
      ready_tasks.add(task->ready_tasks_node);
Packit 8f70b4
      SMTask *next_task=next->get_obj();
Packit 8f70b4
      if(next_task)  // protect next from deleting
Packit 8f70b4
	 next_task->IncRefCount();
Packit 8f70b4
      res|=task->ScheduleThis();
Packit 8f70b4
      if(next_task)
Packit 8f70b4
	 next_task->DecRefCount();
Packit 8f70b4
   }
Packit 8f70b4
   return res;
Packit 8f70b4
}
Packit 8f70b4
Packit 8f70b4
void SMTask::Schedule()
Packit 8f70b4
{
Packit 8f70b4
   block.Empty();
Packit 8f70b4
Packit 8f70b4
   // get time once and assume Do() don't take much time
Packit 8f70b4
   UpdateNow();
Packit 8f70b4
Packit 8f70b4
   timeval timer_timeout=Timer::GetTimeoutTV();
Packit 8f70b4
   if(timer_timeout.tv_sec>=0)
Packit 8f70b4
      block.SetTimeout(timer_timeout);
Packit 8f70b4
Packit 8f70b4
   int res=ScheduleNew();
Packit 8f70b4
   xlist_for_each_safe(SMTask,ready_tasks,node,task,next)
Packit 8f70b4
   {
Packit 8f70b4
      SMTask *next_task=next->get_obj();
Packit 8f70b4
      if(next_task)  // protect next from deleting
Packit 8f70b4
	 next_task->IncRefCount();
Packit 8f70b4
      res|=task->ScheduleThis();
Packit 8f70b4
      res|=ScheduleNew(); // run just created tasks immediately
Packit 8f70b4
      if(next_task)
Packit 8f70b4
	 next_task->DecRefCount();
Packit 8f70b4
   }
Packit 8f70b4
   CollectGarbage();
Packit 8f70b4
   if(res)
Packit 8f70b4
      block.NoWait();
Packit 8f70b4
}
Packit 8f70b4
Packit 8f70b4
void SMTask::Block()
Packit 8f70b4
{
Packit 8f70b4
   // use timer to force periodic select to find out which FDs are ready.
Packit 8f70b4
   if(block.WillNotBlock() && last_block==now.UnixTime())
Packit 8f70b4
      return;
Packit 8f70b4
   block.Block();
Packit 8f70b4
   last_block=now.UnixTime();
Packit 8f70b4
}
Packit 8f70b4
Packit 8f70b4
int SMTaskInit::Do()
Packit 8f70b4
{
Packit 8f70b4
   return STALL;
Packit 8f70b4
}
Packit 8f70b4
SMTaskInit::SMTaskInit()
Packit 8f70b4
{
Packit 8f70b4
   Enter();
Packit 8f70b4
}
Packit 8f70b4
SMTaskInit::~SMTaskInit()
Packit 8f70b4
{
Packit 8f70b4
   Leave();
Packit 8f70b4
}
Packit 8f70b4
Packit 8f70b4
int SMTask::TaskCount()
Packit 8f70b4
{
Packit 8f70b4
   return all_tasks.count();
Packit 8f70b4
}
Packit 8f70b4
Packit 8f70b4
void SMTask::Cleanup()
Packit 8f70b4
{
Packit 8f70b4
   CollectGarbage();
Packit 8f70b4
   Delete(init_task);
Packit 8f70b4
   CollectGarbage();
Packit 8f70b4
}
Packit 8f70b4
Packit 8f70b4
#include <errno.h>
Packit 8f70b4
#include "ResMgr.h"
Packit 8f70b4
ResDecl enospc_fatal ("xfer:disk-full-fatal","no",ResMgr::BoolValidate,ResMgr::NoClosure);
Packit 8f70b4
bool SMTask::NonFatalError(int err)
Packit 8f70b4
{
Packit 8f70b4
   if(E_RETRY(err))
Packit 8f70b4
      return true;
Packit 8f70b4
Packit 8f70b4
   current->TimeoutS(1);
Packit 8f70b4
   if(err==ENFILE || err==EMFILE)
Packit 8f70b4
      return true;
Packit 8f70b4
#ifdef ENOBUFS
Packit 8f70b4
   if(err==ENOBUFS)
Packit 8f70b4
      return true;
Packit 8f70b4
#endif
Packit 8f70b4
#ifdef ENOSR
Packit 8f70b4
   if(err==ENOSR)
Packit 8f70b4
      return true;
Packit 8f70b4
#endif
Packit 8f70b4
#ifdef ENOSPC
Packit 8f70b4
   if(err==ENOSPC)
Packit 8f70b4
      return !enospc_fatal.QueryBool(0);
Packit 8f70b4
#endif
Packit 8f70b4
#ifdef EDQUOT
Packit 8f70b4
   if(err==EDQUOT)
Packit 8f70b4
      return !enospc_fatal.QueryBool(0);
Packit 8f70b4
#endif
Packit 8f70b4
Packit 8f70b4
   current->Timeout(0);
Packit 8f70b4
   return false; /* fatal error */
Packit 8f70b4
}
Packit 8f70b4
Packit 8f70b4
void SMTask::PrintTasks()
Packit 8f70b4
{
Packit 8f70b4
   xlist_for_each(SMTask,all_tasks,node,scan)
Packit 8f70b4
   {
Packit 8f70b4
      const char *c=scan->GetLogContext();
Packit 8f70b4
      if(!c) c="";
Packit 8f70b4
      printf("%p\t%c%c%c\t%d\t%s\n",scan,scan->running?'R':' ',
Packit 8f70b4
	 scan->suspended?'S':' ',scan->deleting?'D':' ',scan->ref_count,c);
Packit 8f70b4
   }
Packit 8f70b4
}