ThreadWeaver
WeaverImpl.cpp
Go to the documentation of this file.
00001 /* -*- C++ -*- 00002 00003 This file implements the WeaverImpl class. 00004 00005 00006 $ Author: Mirko Boehm $ 00007 $ Copyright: (C) 2005, 2006 Mirko Boehm $ 00008 $ Contact: mirko@kde.org 00009 http://www.kde.org 00010 http://www.hackerbuero.org $ 00011 00012 This library is free software; you can redistribute it and/or 00013 modify it under the terms of the GNU Library General Public 00014 License as published by the Free Software Foundation; either 00015 version 2 of the License, or (at your option) any later version. 00016 00017 This library is distributed in the hope that it will be useful, 00018 but WITHOUT ANY WARRANTY; without even the implied warranty of 00019 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 00020 Library General Public License for more details. 00021 00022 You should have received a copy of the GNU Library General Public License 00023 along with this library; see the file COPYING.LIB. If not, write to 00024 the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, 00025 Boston, MA 02110-1301, USA. 00026 00027 $Id: WeaverImpl.cpp 30 2005-08-16 16:16:04Z mirko $ 00028 00029 */ 00030 00031 #include "WeaverImpl.h" 00032 00033 #include <QtCore/QObject> 00034 #include <QtCore/QMutex> 00035 #include <QtCore/QDebug> 00036 00037 #include "Job.h" 00038 #include "State.h" 00039 #include "Thread.h" 00040 #include "ThreadWeaver.h" 00041 #include "DebuggingAids.h" 00042 #include "WeaverObserver.h" 00043 #include "SuspendedState.h" 00044 #include "SuspendingState.h" 00045 #include "DestructedState.h" 00046 #include "WorkingHardState.h" 00047 #include "ShuttingDownState.h" 00048 #include "InConstructionState.h" 00049 00050 using namespace ThreadWeaver; 00051 00052 WeaverImpl::WeaverImpl( QObject* parent ) 00053 : WeaverInterface(parent) 00054 , m_active(0) 00055 , m_inventoryMax( 4 ) 00056 , m_mutex ( new QMutex( QMutex::Recursive ) ) 00057 , m_finishMutex( new QMutex ) 00058 , m_jobAvailableMutex ( new QMutex ) 00059 , m_state (0) 00060 { 00061 // initialize state objects: 00062 m_states[InConstruction] = new InConstructionState( this ); 00063 setState ( InConstruction ); 00064 m_states[WorkingHard] = new WorkingHardState( this ); 00065 m_states[Suspending] = new SuspendingState( this ); 00066 m_states[Suspended] = new SuspendedState( this ); 00067 m_states[ShuttingDown] = new ShuttingDownState( this ); 00068 m_states[Destructed] = new DestructedState( this ); 00069 00070 // FIXME (0.7) this is supposedly unnecessary 00071 connect ( this, SIGNAL ( asyncThreadSuspended( ThreadWeaver::Thread* ) ), 00072 SIGNAL ( threadSuspended( ThreadWeaver::Thread* ) ), 00073 Qt::QueuedConnection ); 00074 setState( WorkingHard ); 00075 } 00076 00077 WeaverImpl::~WeaverImpl() 00078 { // the constructor may only be called from the thread that owns this 00079 // object (everything else would be what we professionals call "insane") 00080 REQUIRE( QThread::currentThread() == thread() ); 00081 debug ( 3, "WeaverImpl dtor: destroying inventory.\n" ); 00082 setState ( ShuttingDown ); 00083 00084 m_jobAvailable.wakeAll(); 00085 00086 // problem: Some threads might not be asleep yet, just finding 00087 // out if a job is available. Those threads will suspend 00088 // waiting for their next job (a rare case, but not impossible). 00089 // Therefore, if we encounter a thread that has not exited, we 00090 // have to wake it again (which we do in the following for 00091 // loop). 00092 00093 while (!m_inventory.isEmpty()) 00094 { 00095 Thread* th=m_inventory.takeFirst(); 00096 if ( !th->isFinished() ) 00097 { 00098 for ( ;; ) 00099 { 00100 m_jobAvailable.wakeAll(); 00101 if ( th->wait( 100 ) ) break; 00102 debug ( 1, "WeaverImpl::~WeaverImpl: thread %i did not exit as expected, " 00103 "retrying.\n", th->id() ); 00104 } 00105 } 00106 emit ( threadExited ( th ) ); 00107 delete th; 00108 } 00109 00110 m_inventory.clear(); 00111 delete m_mutex; 00112 delete m_finishMutex; 00113 delete m_jobAvailableMutex; 00114 debug ( 3, "WeaverImpl dtor: done\n" ); 00115 setState ( Destructed ); // m_state = Halted; 00116 // FIXME: delete state objects. what sense does DestructedState make then? 00117 // FIXME: make state objects static, since they are 00118 } 00119 00120 void WeaverImpl::setState ( StateId id ) 00121 { 00122 if ( m_state==0 || m_state->stateId() != id ) 00123 { 00124 m_state = m_states[id]; 00125 debug ( 2, "WeaverImpl::setState: state changed to \"%s\".\n", 00126 m_state->stateName().toAscii().constData() ); 00127 if ( id == Suspended ) 00128 { 00129 emit ( suspended() ); 00130 } 00131 00132 m_state->activated(); 00133 00134 emit ( stateChanged ( m_state ) ); 00135 } 00136 } 00137 00138 const State& WeaverImpl::state() const 00139 { 00140 return *m_state; 00141 } 00142 00143 void WeaverImpl::setMaximumNumberOfThreads( int cap ) 00144 { 00145 Q_ASSERT_X ( cap > 0, "Weaver Impl", "Thread inventory size has to be larger than zero." ); 00146 QMutexLocker l (m_mutex); 00147 m_inventoryMax = cap; 00148 } 00149 00150 int WeaverImpl::maximumNumberOfThreads() const 00151 { 00152 QMutexLocker l (m_mutex); 00153 return m_inventoryMax; 00154 } 00155 00156 int WeaverImpl::currentNumberOfThreads () const 00157 { 00158 QMutexLocker l (m_mutex); 00159 return m_inventory.count (); 00160 } 00161 00162 void WeaverImpl::registerObserver ( WeaverObserver *ext ) 00163 { 00164 connect ( this, SIGNAL ( stateChanged ( ThreadWeaver::State* ) ), 00165 ext, SIGNAL ( weaverStateChanged ( ThreadWeaver::State* ) ) ); 00166 connect ( this, SIGNAL ( threadStarted ( ThreadWeaver::Thread* ) ), 00167 ext, SIGNAL ( threadStarted ( ThreadWeaver::Thread* ) ) ); 00168 connect ( this, SIGNAL ( threadBusy( ThreadWeaver::Thread*, ThreadWeaver::Job* ) ), 00169 ext, SIGNAL ( threadBusy ( ThreadWeaver::Thread*, ThreadWeaver::Job* ) ) ); 00170 connect ( this, SIGNAL ( threadSuspended ( ThreadWeaver::Thread* ) ), 00171 ext, SIGNAL ( threadSuspended ( ThreadWeaver::Thread* ) ) ); 00172 connect ( this, SIGNAL ( threadExited ( ThreadWeaver::Thread* ) ) , 00173 ext, SIGNAL ( threadExited ( ThreadWeaver::Thread* ) ) ); 00174 } 00175 00176 void WeaverImpl::enqueue(Job* job) 00177 { 00178 adjustInventory ( 1 ); 00179 if (job) 00180 { 00181 debug ( 3, "WeaverImpl::enqueue: queueing job %p of type %s.\n", 00182 (void*)job, job->metaObject()->className() ); 00183 QMutexLocker l (m_mutex); 00184 job->aboutToBeQueued ( this ); 00185 // find positiEon for insertion:; 00186 // FIXME (after 0.6) optimize: factor out queue management into own class, 00187 // and use binary search for insertion (not done yet because 00188 // refactoring already planned): 00189 int i = m_assignments.size(); 00190 if (i > 0) 00191 { 00192 while ( i > 0 && m_assignments.at(i - 1)->priority() < job->priority() ) --i; 00193 m_assignments.insert( i, (job) ); 00194 } else { 00195 m_assignments.append (job); 00196 } 00197 assignJobs(); 00198 } 00199 } 00200 00201 void WeaverImpl::adjustInventory ( int numberOfNewJobs ) 00202 { 00203 QMutexLocker l (m_mutex); 00204 00205 // no of threads that can be created: 00206 const int reserve = m_inventoryMax - m_inventory.count(); 00207 00208 if ( reserve > 0 ) 00209 { 00210 for ( int i = 0; i < qMin ( reserve, numberOfNewJobs ); ++i ) 00211 { 00212 Thread *th = createThread(); 00213 th->moveToThread( th ); // be sane from the start 00214 m_inventory.append(th); 00215 connect ( th, SIGNAL ( jobStarted ( ThreadWeaver::Thread*, ThreadWeaver::Job* ) ), 00216 SIGNAL ( threadBusy ( ThreadWeaver::Thread*, ThreadWeaver::Job* ) ) ); 00217 connect ( th, SIGNAL ( jobDone( ThreadWeaver::Job* ) ), 00218 SIGNAL ( jobDone( ThreadWeaver::Job* ) ) ); 00219 connect ( th, SIGNAL ( started ( ThreadWeaver::Thread* ) ), 00220 SIGNAL ( threadStarted ( ThreadWeaver::Thread* ) ) ); 00221 00222 th->start (); 00223 debug ( 2, "WeaverImpl::adjustInventory: thread created, " 00224 "%i threads in inventory.\n", currentNumberOfThreads() ); 00225 } 00226 } 00227 } 00228 00229 Thread* WeaverImpl::createThread() 00230 { 00231 return new Thread( this ); 00232 } 00233 00234 bool WeaverImpl::dequeue ( Job* job ) 00235 { 00236 bool result; 00237 { 00238 QMutexLocker l (m_mutex); 00239 00240 int i = m_assignments.indexOf ( job ); 00241 if ( i != -1 ) 00242 { 00243 job->aboutToBeDequeued( this ); 00244 00245 m_assignments.removeAt( i ); 00246 result = true; 00247 debug( 3, "WeaverImpl::dequeue: job %p dequeued, %i jobs left.\n", 00248 (void*)job, m_assignments.size() ); 00249 } else { 00250 debug( 3, "WeaverImpl::dequeue: job %p not found in queue.\n", (void*)job ); 00251 result = false; 00252 } 00253 } 00254 00255 // from the queues point of view, a job is just as finished if 00256 // it gets dequeued: 00257 m_jobFinished.wakeOne(); 00258 return result; 00259 } 00260 00261 void WeaverImpl::dequeue () 00262 { 00263 debug( 3, "WeaverImpl::dequeue: dequeueing all jobs.\n" ); 00264 QMutexLocker l (m_mutex); 00265 for ( int index = 0; index < m_assignments.size(); ++index ) 00266 { 00267 m_assignments.at( index )->aboutToBeDequeued( this ); 00268 } 00269 m_assignments.clear(); 00270 00271 ENSURE ( m_assignments.isEmpty() ); 00272 } 00273 00274 void WeaverImpl::suspend () 00275 { 00276 m_state->suspend(); 00277 } 00278 00279 void WeaverImpl::resume ( ) 00280 { 00281 m_state->resume(); 00282 } 00283 00284 void WeaverImpl::assignJobs() 00285 { 00286 m_jobAvailable.wakeAll(); 00287 } 00288 00289 bool WeaverImpl::isEmpty() const 00290 { 00291 QMutexLocker l (m_mutex); 00292 return m_assignments.isEmpty(); 00293 } 00294 00295 00296 void WeaverImpl::incActiveThreadCount() 00297 { 00298 adjustActiveThreadCount ( 1 ); 00299 } 00300 00301 void WeaverImpl::decActiveThreadCount() 00302 { 00303 adjustActiveThreadCount ( -1 ); 00304 // the done job could have freed another set of jobs, and we do not know how 00305 // many - therefore we need to wake all threads: 00306 m_jobFinished.wakeAll(); 00307 } 00308 00309 void WeaverImpl::adjustActiveThreadCount( int diff ) 00310 { 00311 QMutexLocker l (m_mutex); 00312 m_active += diff; 00313 debug ( 4, "WeaverImpl::adjustActiveThreadCount: %i active threads (%i jobs" 00314 " in queue).\n", m_active, queueLength() ); 00315 00316 if ( m_assignments.isEmpty() && m_active == 0) 00317 { 00318 P_ASSERT ( diff < 0 ); // cannot reach Zero otherwise 00319 emit ( finished() ); 00320 } 00321 } 00322 00323 int WeaverImpl::activeThreadCount() 00324 { 00325 QMutexLocker l (m_mutex); 00326 return m_active; 00327 } 00328 00329 Job* WeaverImpl::takeFirstAvailableJob() 00330 { 00331 QMutexLocker l (m_mutex); 00332 Job *next = 0; 00333 for (int index = 0; index < m_assignments.size(); ++index) 00334 { 00335 if ( m_assignments.at(index)->canBeExecuted() ) 00336 { 00337 next = m_assignments.at(index); 00338 m_assignments.removeAt (index); 00339 break; 00340 } 00341 } 00342 return next; 00343 } 00344 00345 Job* WeaverImpl::applyForWork(Thread *th, Job* previous) 00346 { 00347 if (previous) 00348 { // cleanup and send events: 00349 decActiveThreadCount(); 00350 } 00351 return m_state->applyForWork ( th, 0 ); 00352 } 00353 00354 void WeaverImpl::waitForAvailableJob(Thread* th) 00355 { 00356 m_state->waitForAvailableJob ( th ); 00357 } 00358 00359 void WeaverImpl::blockThreadUntilJobsAreBeingAssigned ( Thread *th ) 00360 { // th is the thread that calls this method: 00361 Q_UNUSED ( th ); 00362 debug ( 4, "WeaverImpl::blockThread...: thread %i blocked.\n", th->id()); 00363 emit asyncThreadSuspended ( th ); 00364 QMutexLocker l( m_jobAvailableMutex ); 00365 m_jobAvailable.wait( m_jobAvailableMutex ); 00366 debug ( 4, "WeaverImpl::blockThread...: thread %i resumed.\n", th->id()); 00367 } 00368 00369 int WeaverImpl::queueLength() const 00370 { 00371 QMutexLocker l (m_mutex); 00372 return m_assignments.count(); 00373 } 00374 00375 bool WeaverImpl::isIdle () const 00376 { 00377 QMutexLocker l (m_mutex); 00378 return isEmpty() && m_active == 0; 00379 } 00380 00381 void WeaverImpl::finish() 00382 { 00383 #ifdef QT_NO_DEBUG 00384 const int MaxWaitMilliSeconds = 200; 00385 #else 00386 const int MaxWaitMilliSeconds = 2000; 00387 #endif 00388 00389 while ( !isIdle() ) 00390 { 00391 debug (2, "WeaverImpl::finish: not done, waiting.\n" ); 00392 QMutexLocker l( m_finishMutex ); 00393 if ( m_jobFinished.wait( m_finishMutex, MaxWaitMilliSeconds ) == false ) 00394 { 00395 debug ( 2, "WeaverImpl::finish: wait timed out, %i jobs left, waking threads.\n", 00396 queueLength() ); 00397 m_jobAvailable.wakeAll(); 00398 } 00399 } 00400 debug (2, "WeaverImpl::finish: done.\n\n\n" ); 00401 } 00402 00403 void WeaverImpl::requestAbort() 00404 { 00405 QMutexLocker l (m_mutex); 00406 for ( int i = 0; i<m_inventory.size(); ++i ) 00407 { 00408 m_inventory[i]->requestAbort(); 00409 } 00410 } 00411 00412 void WeaverImpl::dumpJobs() 00413 { 00414 QMutexLocker l (m_mutex); 00415 debug( 0, "WeaverImpl::dumpJobs: current jobs:\n" ); 00416 for ( int index = 0; index < m_assignments.size(); ++index ) 00417 { 00418 debug( 0, "--> %4i: %p %s (priority %i)\n", index, (void*)m_assignments.at( index ), 00419 m_assignments.at( index )->metaObject()->className(), 00420 m_assignments.at(index)->priority() ); 00421 } 00422 } 00423 00424 #include "WeaverImpl.moc"
KDE 4.6 API Reference