ThreadWeaver
JobCollection.cpp
Go to the documentation of this file.
00001 /* -*- C++ -*- 00002 00003 This file implements the JobCollection class. 00004 00005 $ Author: Mirko Boehm $ 00006 $ Copyright: (C) 2004, 2005, 2006 Mirko Boehm $ 00007 $ Contact: mirko@kde.org 00008 http://www.kde.org 00009 http://www.hackerbuero.org $ 00010 00011 This library is free software; you can redistribute it and/or 00012 modify it under the terms of the GNU Library General Public 00013 License as published by the Free Software Foundation; either 00014 version 2 of the License, or (at your option) any later version. 00015 00016 This library is distributed in the hope that it will be useful, 00017 but WITHOUT ANY WARRANTY; without even the implied warranty of 00018 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 00019 Library General Public License for more details. 00020 00021 You should have received a copy of the GNU Library General Public License 00022 along with this library; see the file COPYING.LIB. If not, write to 00023 the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, 00024 Boston, MA 02110-1301, USA. 00025 00026 $Id: DebuggingAids.h 30 2005-08-16 16:16:04Z mirko $ 00027 */ 00028 00029 #include "JobCollection.h" 00030 #include "JobCollection_p.h" 00031 00032 #include "WeaverInterface.h" 00033 #include "DebuggingAids.h" 00034 00035 #include <QtCore/QList> 00036 #include <QtCore/QObject> 00037 #include <QtCore/QPointer> 00038 00039 #include "DependencyPolicy.h" 00040 00041 using namespace ThreadWeaver; 00042 00043 JobCollectionJobRunner::JobCollectionJobRunner ( JobCollection* collection, Job* payload, QObject* parent ) 00044 : Job( parent ) 00045 , m_payload( payload ) 00046 , m_collection( collection ) 00047 { 00048 Q_ASSERT ( payload ); // will not accept zero jobs 00049 00050 if ( ! m_payload->objectName().isEmpty() ) 00051 { // this is most useful for debugging... 00052 setObjectName( tr( "JobRunner executing " ) + m_payload->objectName() ); 00053 } else { 00054 setObjectName( tr( "JobRunner (unnamed payload)" ) ); 00055 } 00056 } 00057 00058 bool JobCollectionJobRunner::canBeExecuted() 00059 { // the JobCollectionJobRunner object never have any dependencies: 00060 return m_payload->canBeExecuted(); 00061 } 00062 00063 Job* JobCollectionJobRunner::payload () 00064 { 00065 return m_payload; 00066 } 00067 00068 void JobCollectionJobRunner::aboutToBeQueued ( WeaverInterface *weaver ) 00069 { 00070 m_payload->aboutToBeQueued( weaver ); 00071 } 00072 00073 void JobCollectionJobRunner::aboutToBeDequeued ( WeaverInterface *weaver ) 00074 { 00075 m_payload->aboutToBeDequeued( weaver ); 00076 } 00077 00078 void JobCollectionJobRunner::execute ( Thread *t ) 00079 { 00080 if ( m_payload ) 00081 { 00082 m_payload->execute ( t ); 00083 m_collection->internalJobDone ( m_payload); 00084 } else { 00085 debug ( 1, "JobCollection: job in collection has been deleted." ); 00086 } 00087 Job::execute ( t ); 00088 } 00089 00090 int JobCollectionJobRunner::priority () const 00091 { 00092 return m_payload->priority(); 00093 } 00094 00095 void JobCollectionJobRunner::run () 00096 { 00097 } 00098 00099 class JobList : public QList <JobCollectionJobRunner*> {}; 00100 00101 class JobCollection::Private 00102 { 00103 public: 00104 00105 Private() 00106 : elements ( new JobList() ) 00107 , weaver ( 0 ) 00108 , jobCounter (0) 00109 {} 00110 00111 ~Private() 00112 { 00113 delete elements; 00114 } 00115 00116 /* The elements of the collection. */ 00117 JobList* elements; 00118 00119 /* The Weaver interface this collection is queued in. */ 00120 WeaverInterface *weaver; 00121 00122 /* Counter for the finished jobs. 00123 Set to the number of elements when started. 00124 When zero, all elements are done. 00125 */ 00126 int jobCounter; 00127 00128 QMutex mutex; 00129 }; 00130 00131 JobCollection::JobCollection ( QObject *parent ) 00132 : Job ( parent ) 00133 , d (new Private) 00134 { 00135 } 00136 00137 JobCollection::~JobCollection() 00138 { // dequeue all remaining jobs: 00139 if ( d->weaver != 0 ) // still queued 00140 dequeueElements(); 00141 // QObject cleanup takes care of the job runners 00142 delete d; 00143 } 00144 00145 void JobCollection::addJob ( Job *job ) 00146 { 00147 REQUIRE( d->weaver == 0 ); 00148 REQUIRE( job != 0); 00149 00150 JobCollectionJobRunner* runner = new JobCollectionJobRunner( this, job, this ); 00151 d->elements->append ( runner ); 00152 connect( runner , SIGNAL(done(ThreadWeaver::Job*)) , this , SLOT(jobRunnerDone()) ); 00153 } 00154 00155 void JobCollection::stop( Job *job ) 00156 { // this only works if there is an event queue executed by the main 00157 // thread, and it is not blocked: 00158 Q_UNUSED( job ); 00159 if ( d->weaver != 0 ) 00160 { 00161 debug( 4, "JobCollection::stop: dequeueing %p.\n", (void*)this); 00162 d->weaver->dequeue( this ); 00163 } 00164 // FIXME ENSURE ( d->weaver == 0 ); // verify that aboutToBeDequeued has been called 00165 } 00166 00167 void JobCollection::aboutToBeQueued ( WeaverInterface *weaver ) 00168 { 00169 REQUIRE ( d->weaver == 0 ); // never queue twice 00170 00171 d->weaver = weaver; 00172 00173 if ( d->elements->size() > 0 ) 00174 { 00175 d->elements->at( 0 )->aboutToBeQueued( weaver ); 00176 } 00177 00178 ENSURE(d->weaver != 0); 00179 } 00180 00181 void JobCollection::aboutToBeDequeued( WeaverInterface* weaver ) 00182 { // Q_ASSERT ( d->weaver != 0 ); 00183 // I thought: "must have been queued first" 00184 // but the user can queue and dequeue in a suspended Weaver 00185 00186 if ( d->weaver ) 00187 { 00188 dequeueElements(); 00189 00190 if ( !d->elements->isEmpty() ) 00191 { 00192 d->elements->at( 0 )->aboutToBeDequeued( weaver ); 00193 } 00194 } 00195 00196 d->weaver = 0; 00197 ENSURE ( d->weaver == 0 ); 00198 } 00199 00200 void JobCollection::execute ( Thread *t ) 00201 { 00202 REQUIRE ( d->weaver != 0); 00203 00204 // this is async, 00205 // JobTests::JobSignalsAreEmittedAsynchronouslyTest() proves it 00206 emit (started (this)); 00207 00208 if ( d->elements->isEmpty() ) 00209 { // we are just a regular, empty job (sob...): 00210 Job::execute( t ); 00211 return; 00212 } 00213 00214 { // d->elements is supposedly constant at this time, since we are 00215 // already queued 00216 // set job counter: 00217 QMutexLocker l ( & d->mutex ); 00218 d->jobCounter = d->elements->size(); 00219 00220 // queue elements: 00221 for (int index = 1; index < d->elements->size(); ++index) 00222 { 00223 d->weaver->enqueue (d->elements->at(index)); 00224 } 00225 } 00226 // this is a hack (but a good one): instead of queueing (this), we 00227 // execute the last job, to avoid to have (this) wait for an 00228 // available thread (the last operation does not get queued in 00229 // aboutToBeQueued() ) 00230 // NOTE: this also calls internalJobDone() 00231 d->elements->at( 0 )->execute ( t ); 00232 00233 // do not emit done, done is emitted when the last job called 00234 // internalJobDone() 00235 // also, do not free the queue policies yet, since not the whole job 00236 // is done 00237 } 00238 00239 Job* JobCollection::jobAt( int i ) 00240 { 00241 QMutexLocker l( &d->mutex ); 00242 REQUIRE ( i >= 0 && i < d->elements->size() ); 00243 return d->elements->at( i )->payload(); 00244 } 00245 00246 const int JobCollection::jobListLength() // const qualifier is possibly BiC? 00247 { 00248 QMutexLocker l( &d->mutex ); 00249 return d->elements->size(); 00250 } 00251 00252 bool JobCollection::canBeExecuted() 00253 { 00254 bool inheritedCanRun = true; 00255 00256 QMutexLocker l( &d->mutex ); 00257 00258 if ( d->elements->size() > 0 ) 00259 { 00260 inheritedCanRun = d->elements->at( 0 )->canBeExecuted(); 00261 } 00262 00263 return Job::canBeExecuted() && inheritedCanRun; 00264 } 00265 00266 void JobCollection::jobRunnerDone() 00267 { 00268 // Note: d->mutex must be unlocked before emitting the done() signal 00269 // because this JobCollection may be deleted by a slot connected to done() 00270 // in another thread 00271 bool emitDone = false; 00272 00273 { 00274 QMutexLocker l(&d->mutex); 00275 00276 if ( d->jobCounter == 0 ) 00277 { // there is a small chance that (this) has been dequeued in the 00278 // meantime, in this case, there is nothing left to clean up: 00279 d->weaver = 0; 00280 return; 00281 } 00282 00283 --d->jobCounter; 00284 00285 ENSURE (d->jobCounter >= 0); 00286 00287 if (d->jobCounter == 0) 00288 { 00289 if (! success()) 00290 { 00291 emit failed(this); 00292 } 00293 00294 finalCleanup(); 00295 emitDone = true; 00296 } 00297 } 00298 00299 if (emitDone) 00300 emit done(this); 00301 } 00302 void JobCollection::internalJobDone ( Job* job ) 00303 { 00304 REQUIRE( job != 0 ); 00305 Q_UNUSED (job); 00306 } 00307 00308 void JobCollection::finalCleanup() 00309 { 00310 freeQueuePolicyResources(); 00311 setFinished(true); 00312 d->weaver = 0; 00313 } 00314 00315 void JobCollection::dequeueElements() 00316 { 00317 // Note: d->mutex must be unlocked before emitting the done() signal 00318 // because this JobCollection may be deleted by a slot connected to done() in another 00319 // thread 00320 00321 bool emitDone = false; 00322 00323 { 00324 // dequeue everything: 00325 QMutexLocker l( &d->mutex ); 00326 00327 if ( d->weaver != 0 ) 00328 { 00329 for ( int index = 1; index < d->elements->size(); ++index ) 00330 { 00331 if ( d->elements->at( index ) && ! d->elements->at( index )->isFinished() ) // ... a QPointer 00332 { 00333 debug( 4, "JobCollection::dequeueElements: dequeueing %p.\n", 00334 (void*)d->elements->at( index ) ); 00335 d->weaver->dequeue ( d->elements->at( index ) ); 00336 } else { 00337 debug( 4, "JobCollection::dequeueElements: not dequeueing %p, already finished.\n", 00338 (void*)d->elements->at( index ) ); 00339 } 00340 } 00341 00342 if (d->jobCounter != 0) 00343 { // if jobCounter is not zero, then we where waiting for the 00344 // last job to finish before we would have freed our queue 00345 // policies, but in this case we have to do it here: 00346 finalCleanup(); 00347 } 00348 d->jobCounter = 0; 00349 } 00350 } 00351 if (emitDone) 00352 emit done(this); 00353 } 00354 00355 #include "JobCollection.moc" 00356 #include "JobCollection_p.moc"
KDE 4.6 API Reference