/* * lftp - file transfer program * * Copyright (c) 1996-2016 by Alexander V. Lukyanov (lav@yars.free.net) * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 3 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program. If not, see . */ #include #include #include #include "trio.h" #ifdef TIME_WITH_SYS_TIME # include # include #endif #include "SMTask.h" #include "Timer.h" #include "misc.h" #ifdef TASK_DEBUG # define DEBUG(x) do{printf x;fflush(stdout);}while(0) #else # define DEBUG(x) do{}while(0) #endif xlist_head SMTask::all_tasks; xlist_head SMTask::ready_tasks; xlist_head SMTask::new_tasks; xlist_head SMTask::deleted_tasks; SMTask *SMTask::current; SMTask *SMTask::stack[SMTASK_MAX_DEPTH]; int SMTask::stack_ptr; PollVec SMTask::block; TimeDate SMTask::now; time_t SMTask::last_block; static SMTask *init_task=new SMTaskInit; SMTask::SMTask() : all_tasks_node(this), ready_tasks_node(this), new_tasks_node(this), deleted_tasks_node(this) { // insert in the chain all_tasks.add(all_tasks_node); suspended=false; suspended_slave=false; running=0; ref_count=0; deleting=false; new_tasks.add(new_tasks_node); DEBUG(("new SMTask %p (count=%d)\n",this,all_tasks.count())); } void SMTask::Suspend() { if(suspended) return; DEBUG(("Suspend(%p) from %p\n",this,current)); if(!IsSuspended()) SuspendInternal(); suspended=true; } void SMTask::Resume() { if(!suspended) return; suspended=false; if(!IsSuspended()) ResumeInternal(); } void SMTask::SuspendSlave() { if(suspended_slave) return; DEBUG(("SuspendSlave(%p) from %p\n",this,current)); if(!IsSuspended()) SuspendInternal(); suspended_slave=true; } void SMTask::ResumeSlave() { if(!suspended_slave) return; suspended_slave=false; if(!IsSuspended()) ResumeInternal(); } void SMTask::ResumeInternal() { if(!new_tasks_node.listed() && !ready_tasks_node.listed()) new_tasks.add_tail(new_tasks_node); } SMTask::~SMTask() { DEBUG(("delete SMTask %p (count=%d)\n",this,all_tasks.count())); assert(!running); assert(!ref_count); assert(deleting); if(ready_tasks_node.listed()) ready_tasks_node.remove(); if(new_tasks_node.listed()) new_tasks_node.remove(); assert(!deleted_tasks_node.listed()); // remove from the chain all_tasks_node.remove(); } void SMTask::DeleteLater() { if(deleting) return; deleting=true; deleted_tasks.add_tail(deleted_tasks_node); PrepareToDie(); } void SMTask::Delete(SMTask *task) { if(!task) return; task->DeleteLater(); // CollectGarbage will delete the task gracefully } SMTask *SMTask::_SetRef(SMTask *task,SMTask *new_task) { _DeleteRef(task); _MakeRef(new_task); return new_task; } void SMTask::Enter(SMTask *task) { assert(stack_ptrrunning++; } void SMTask::Leave(SMTask *task) { assert(current==task); current->running--; assert(stack_ptr>0); current=stack[--stack_ptr]; } int SMTask::Roll(SMTask *task) { int m=STALL; if(task->running || task->deleting) return m; Enter(task); while(!task->deleting && task->Do()==MOVED) m=MOVED; Leave(task); return m; } void SMTask::RollAll(const TimeInterval &max_time) { Timer limit_timer(max_time); do { Schedule(); } while(block.WillNotBlock() && !limit_timer.Stopped()); } int SMTask::CollectGarbage() { int count=0; xlist_for_each_safe(SMTask,deleted_tasks,node,task,next) { if(task->running || task->ref_count) continue; node->remove(); delete task; count++; } return count; } int SMTask::ScheduleThis() { assert(ready_tasks_node.listed()); if(running) return STALL; if(deleting || IsSuspended()) { ready_tasks_node.remove(); return STALL; } Enter(); // mark it current and running. int res=Do(); // let it run. Leave(); // unmark it running and change current. return res; } int SMTask::ScheduleNew() { int res=STALL; xlist_for_each_safe(SMTask,new_tasks,node,task,next) { task->new_tasks_node.remove(); ready_tasks.add(task->ready_tasks_node); SMTask *next_task=next->get_obj(); if(next_task) // protect next from deleting next_task->IncRefCount(); res|=task->ScheduleThis(); if(next_task) next_task->DecRefCount(); } return res; } void SMTask::Schedule() { block.Empty(); // get time once and assume Do() don't take much time UpdateNow(); timeval timer_timeout=Timer::GetTimeoutTV(); if(timer_timeout.tv_sec>=0) block.SetTimeout(timer_timeout); int res=ScheduleNew(); xlist_for_each_safe(SMTask,ready_tasks,node,task,next) { SMTask *next_task=next->get_obj(); if(next_task) // protect next from deleting next_task->IncRefCount(); res|=task->ScheduleThis(); res|=ScheduleNew(); // run just created tasks immediately if(next_task) next_task->DecRefCount(); } CollectGarbage(); if(res) block.NoWait(); } void SMTask::Block() { // use timer to force periodic select to find out which FDs are ready. if(block.WillNotBlock() && last_block==now.UnixTime()) return; block.Block(); last_block=now.UnixTime(); } int SMTaskInit::Do() { return STALL; } SMTaskInit::SMTaskInit() { Enter(); } SMTaskInit::~SMTaskInit() { Leave(); } int SMTask::TaskCount() { return all_tasks.count(); } void SMTask::Cleanup() { CollectGarbage(); Delete(init_task); CollectGarbage(); } #include #include "ResMgr.h" ResDecl enospc_fatal ("xfer:disk-full-fatal","no",ResMgr::BoolValidate,ResMgr::NoClosure); bool SMTask::NonFatalError(int err) { if(E_RETRY(err)) return true; current->TimeoutS(1); if(err==ENFILE || err==EMFILE) return true; #ifdef ENOBUFS if(err==ENOBUFS) return true; #endif #ifdef ENOSR if(err==ENOSR) return true; #endif #ifdef ENOSPC if(err==ENOSPC) return !enospc_fatal.QueryBool(0); #endif #ifdef EDQUOT if(err==EDQUOT) return !enospc_fatal.QueryBool(0); #endif current->Timeout(0); return false; /* fatal error */ } void SMTask::PrintTasks() { xlist_for_each(SMTask,all_tasks,node,scan) { const char *c=scan->GetLogContext(); if(!c) c=""; printf("%p\t%c%c%c\t%d\t%s\n",scan,scan->running?'R':' ', scan->suspended?'S':' ',scan->deleting?'D':' ',scan->ref_count,c); } }