KIO
scheduler.cpp
Go to the documentation of this file.
00001 /* This file is part of the KDE libraries 00002 Copyright (C) 2000 Stephan Kulow <coolo@kde.org> 00003 Waldo Bastian <bastian@kde.org> 00004 Copyright (C) 2009, 2010 Andreas Hartmetz <ahartmetz@gmail.com> 00005 00006 This library is free software; you can redistribute it and/or 00007 modify it under the terms of the GNU Library General Public 00008 License version 2 as published by the Free Software Foundation. 00009 00010 This library is distributed in the hope that it will be useful, 00011 but WITHOUT ANY WARRANTY; without even the implied warranty of 00012 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 00013 Library General Public License for more details. 00014 00015 You should have received a copy of the GNU Library General Public License 00016 along with this library; see the file COPYING.LIB. If not, write to 00017 the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, 00018 Boston, MA 02110-1301, USA. 00019 */ 00020 00021 #include "scheduler.h" 00022 #include "scheduler_p.h" 00023 00024 #include "sessiondata.h" 00025 #include "slaveconfig.h" 00026 #include "authinfo.h" 00027 #include "slave.h" 00028 #include "connection.h" 00029 #include "job_p.h" 00030 00031 #include <kdebug.h> 00032 #include <kprotocolmanager.h> 00033 #include <kprotocolinfo.h> 00034 #include <assert.h> 00035 00036 #include <QtCore/QHash> 00037 #include <QtGui/QWidget> 00038 #include <QtDBus/QtDBus> 00039 00040 // Slaves may be idle for a certain time (3 minutes) before they are killed. 00041 static const int s_idleSlaveLifetime = 3 * 60; 00042 00043 00044 using namespace KIO; 00045 00046 #ifndef KDE_USE_FINAL // already defined in job.cpp 00047 static inline Slave *jobSlave(SimpleJob *job) 00048 { 00049 return SimpleJobPrivate::get(job)->m_slave; 00050 } 00051 #endif 00052 00053 static inline int jobCommand(SimpleJob *job) 00054 { 00055 return SimpleJobPrivate::get(job)->m_command; 00056 } 00057 00058 static inline void startJob(SimpleJob *job, Slave *slave) 00059 { 00060 SimpleJobPrivate::get(job)->start(slave); 00061 } 00062 00063 // here be uglies 00064 // forward declaration to break cross-dependency of SlaveKeeper and SchedulerPrivate 00065 static void setupSlave(KIO::Slave *slave, const KUrl &url, const QString &protocol, 00066 const QStringList &proxyList, bool newSlave, const KIO::MetaData *config = 0); 00067 // same reason as above 00068 static Scheduler *scheduler(); 00069 static Slave *heldSlaveForJob(SimpleJob *job); 00070 00071 00072 int SerialPicker::changedPrioritySerial(int oldSerial, int newPriority) const 00073 { 00074 Q_ASSERT(newPriority >= -10 && newPriority <= 10); 00075 newPriority = qBound(-10, newPriority, 10); 00076 int unbiasedSerial = oldSerial % m_jobsPerPriority; 00077 return unbiasedSerial + newPriority * m_jobsPerPriority; 00078 } 00079 00080 00081 SlaveKeeper::SlaveKeeper() 00082 { 00083 m_grimTimer.setSingleShot(true); 00084 connect (&m_grimTimer, SIGNAL(timeout()), SLOT(grimReaper())); 00085 } 00086 00087 void SlaveKeeper::returnSlave(Slave *slave) 00088 { 00089 Q_ASSERT(slave); 00090 slave->setIdle(); 00091 m_idleSlaves.insert(slave->host(), slave); 00092 scheduleGrimReaper(); 00093 } 00094 00095 Slave *SlaveKeeper::takeSlaveForJob(SimpleJob *job) 00096 { 00097 Slave *slave = heldSlaveForJob(job); 00098 if (slave) { 00099 return slave; 00100 } 00101 00102 KUrl url = SimpleJobPrivate::get(job)->m_url; 00103 // TODO take port, username and password into account 00104 QMultiHash<QString, Slave *>::Iterator it = m_idleSlaves.find(url.host()); 00105 if (it == m_idleSlaves.end()) { 00106 it = m_idleSlaves.begin(); 00107 } 00108 if (it == m_idleSlaves.end()) { 00109 return 0; 00110 } 00111 slave = it.value(); 00112 m_idleSlaves.erase(it); 00113 return slave; 00114 } 00115 00116 bool SlaveKeeper::removeSlave(Slave *slave) 00117 { 00118 // ### performance not so great 00119 QMultiHash<QString, Slave *>::Iterator it = m_idleSlaves.begin(); 00120 for (; it != m_idleSlaves.end(); ++it) { 00121 if (it.value() == slave) { 00122 m_idleSlaves.erase(it); 00123 return true; 00124 } 00125 } 00126 return false; 00127 } 00128 00129 QList<Slave *> SlaveKeeper::allSlaves() const 00130 { 00131 return m_idleSlaves.values(); 00132 } 00133 00134 void SlaveKeeper::scheduleGrimReaper() 00135 { 00136 if (!m_grimTimer.isActive()) { 00137 m_grimTimer.start((s_idleSlaveLifetime / 2) * 1000); 00138 } 00139 } 00140 00141 //private slot 00142 void SlaveKeeper::grimReaper() 00143 { 00144 QMultiHash<QString, Slave *>::Iterator it = m_idleSlaves.begin(); 00145 while (it != m_idleSlaves.end()) { 00146 Slave *slave = it.value(); 00147 if (slave->idleTime() >= s_idleSlaveLifetime) { 00148 it = m_idleSlaves.erase(it); 00149 if (slave->job()) { 00150 kDebug (7006) << "Idle slave" << slave << "still has job" << slave->job(); 00151 } 00152 slave->kill(); 00153 // avoid invoking slotSlaveDied() because its cleanup services are not needed 00154 slave->deref(); 00155 } else { 00156 ++it; 00157 } 00158 } 00159 if (!m_idleSlaves.isEmpty()) { 00160 scheduleGrimReaper(); 00161 } 00162 } 00163 00164 00165 int HostQueue::lowestSerial() const 00166 { 00167 QMap<int, SimpleJob*>::ConstIterator first = m_queuedJobs.constBegin(); 00168 if (first != m_queuedJobs.constEnd()) { 00169 return first.key(); 00170 } 00171 return SerialPicker::maxSerial; 00172 } 00173 00174 void HostQueue::queueJob(SimpleJob *job) 00175 { 00176 const int serial = SimpleJobPrivate::get(job)->m_schedSerial; 00177 Q_ASSERT(serial != 0); 00178 Q_ASSERT(!m_queuedJobs.contains(serial)); 00179 Q_ASSERT(!m_runningJobs.contains(job)); 00180 m_queuedJobs.insert(serial, job); 00181 } 00182 00183 SimpleJob *HostQueue::takeFirstInQueue() 00184 { 00185 Q_ASSERT(!m_queuedJobs.isEmpty()); 00186 QMap<int, SimpleJob *>::iterator first = m_queuedJobs.begin(); 00187 SimpleJob *job = first.value(); 00188 m_queuedJobs.erase(first); 00189 m_runningJobs.insert(job); 00190 return job; 00191 } 00192 00193 bool HostQueue::removeJob(SimpleJob *job) 00194 { 00195 const int serial = SimpleJobPrivate::get(job)->m_schedSerial; 00196 if (m_runningJobs.remove(job)) { 00197 Q_ASSERT(!m_queuedJobs.contains(serial)); 00198 return true; 00199 } 00200 if (m_queuedJobs.remove(serial)) { 00201 return true; 00202 } 00203 return false; 00204 } 00205 00206 QList<Slave *> HostQueue::allSlaves() const 00207 { 00208 QList<Slave *> ret; 00209 Q_FOREACH (SimpleJob *job, m_runningJobs) { 00210 Slave *slave = jobSlave(job); 00211 Q_ASSERT(slave); 00212 ret.append(slave); 00213 } 00214 return ret; 00215 } 00216 00217 00218 00219 ConnectedSlaveQueue::ConnectedSlaveQueue() 00220 { 00221 m_startJobsTimer.setSingleShot(true); 00222 connect (&m_startJobsTimer, SIGNAL(timeout()), SLOT(startRunnableJobs())); 00223 } 00224 00225 bool ConnectedSlaveQueue::queueJob(SimpleJob *job, Slave *slave) 00226 { 00227 QHash<Slave *, PerSlaveQueue>::Iterator it = m_connectedSlaves.find(slave); 00228 if (it == m_connectedSlaves.end()) { 00229 return false; 00230 } 00231 SimpleJobPrivate::get(job)->m_slave = slave; 00232 00233 PerSlaveQueue &jobs = it.value(); 00234 jobs.waitingList.append(job); 00235 if (!jobs.runningJob) { 00236 // idle slave now has a job to run 00237 m_runnableSlaves.insert(slave); 00238 m_startJobsTimer.start(); 00239 } 00240 return true; 00241 } 00242 00243 bool ConnectedSlaveQueue::removeJob(SimpleJob *job) 00244 { 00245 Slave *slave = jobSlave(job); 00246 Q_ASSERT(slave); 00247 QHash<Slave *, PerSlaveQueue>::Iterator it = m_connectedSlaves.find(slave); 00248 if (it == m_connectedSlaves.end()) { 00249 return false; 00250 } 00251 PerSlaveQueue &jobs = it.value(); 00252 if (jobs.runningJob || jobs.waitingList.isEmpty()) { 00253 // a slave that was busy running a job was not runnable. 00254 // a slave that has no waiting job(s) was not runnable either. 00255 Q_ASSERT(!m_runnableSlaves.contains(slave)); 00256 } 00257 00258 const bool removedRunning = jobs.runningJob == job; 00259 const bool removedWaiting = jobs.waitingList.removeAll(job) != 0; 00260 if (removedRunning) { 00261 jobs.runningJob = 0; 00262 Q_ASSERT(!removedWaiting); 00263 } 00264 const bool removedTheJob = removedRunning || removedWaiting; 00265 00266 if (!slave->isAlive()) { 00267 removeSlave(slave); 00268 return removedTheJob; 00269 } 00270 00271 if (removedRunning && jobs.waitingList.count()) { 00272 m_runnableSlaves.insert(slave); 00273 m_startJobsTimer.start(); 00274 } 00275 if (removedWaiting && jobs.waitingList.isEmpty()) { 00276 m_runnableSlaves.remove(slave); 00277 } 00278 return removedTheJob; 00279 } 00280 00281 void ConnectedSlaveQueue::addSlave(Slave *slave) 00282 { 00283 Q_ASSERT(slave); 00284 if (!m_connectedSlaves.contains(slave)) { 00285 m_connectedSlaves.insert(slave, PerSlaveQueue()); 00286 } 00287 } 00288 00289 bool ConnectedSlaveQueue::removeSlave(Slave *slave) 00290 { 00291 QHash<Slave *, PerSlaveQueue>::Iterator it = m_connectedSlaves.find(slave); 00292 if (it == m_connectedSlaves.end()) { 00293 return false; 00294 } 00295 PerSlaveQueue &jobs = it.value(); 00296 Q_FOREACH (SimpleJob *job, jobs.waitingList) { 00297 // ### for compatibility with the old scheduler we don't touch the running job, if any. 00298 // make sure that the job doesn't call back into Scheduler::cancelJob(); this would 00299 // a) crash and b) be unnecessary because we clean up just fine. 00300 SimpleJobPrivate::get(job)->m_schedSerial = 0; 00301 job->kill(); 00302 } 00303 m_connectedSlaves.erase(it); 00304 m_runnableSlaves.remove(slave); 00305 00306 slave->kill(); 00307 return true; 00308 } 00309 00310 // KDE5: only one caller, for doubtful reasons. remove this if possible. 00311 bool ConnectedSlaveQueue::isIdle(Slave *slave) 00312 { 00313 QHash<Slave *, PerSlaveQueue>::Iterator it = m_connectedSlaves.find(slave); 00314 if (it == m_connectedSlaves.end()) { 00315 return false; 00316 } 00317 return it.value().runningJob == 0; 00318 } 00319 00320 00321 //private slot 00322 void ConnectedSlaveQueue::startRunnableJobs() 00323 { 00324 QSet<Slave *>::Iterator it = m_runnableSlaves.begin(); 00325 while (it != m_runnableSlaves.end()) { 00326 Slave *slave = *it; 00327 if (!slave->isConnected()) { 00328 // this polling is somewhat inefficient... 00329 m_startJobsTimer.start(); 00330 ++it; 00331 continue; 00332 } 00333 it = m_runnableSlaves.erase(it); 00334 PerSlaveQueue &jobs = m_connectedSlaves[slave]; 00335 SimpleJob *job = jobs.waitingList.takeFirst(); 00336 Q_ASSERT(!jobs.runningJob); 00337 jobs.runningJob = job; 00338 00339 const KUrl url = job->url(); 00340 // no port is -1 in QUrl, but in kde3 we used 0 and the kioslaves assume that. 00341 const int port = url.port() == -1 ? 0 : url.port(); 00342 00343 if (slave->host() == "<reset>") { 00344 MetaData configData = SlaveConfig::self()->configData(url.protocol(), url.host()); 00345 slave->setConfig(configData); 00346 slave->setProtocol(url.protocol()); 00347 slave->setHost(url.host(), port, url.user(), url.pass()); 00348 } 00349 00350 Q_ASSERT(slave->protocol() == url.protocol()); 00351 Q_ASSERT(slave->host() == url.host()); 00352 Q_ASSERT(slave->port() == port); 00353 startJob(job, slave); 00354 } 00355 } 00356 00357 00358 static void ensureNoDuplicates(QMap<int, HostQueue *> *queuesBySerial) 00359 { 00360 Q_UNUSED(queuesBySerial); 00361 #ifdef SCHEDULER_DEBUG 00362 // a host queue may *never* be in queuesBySerial twice. 00363 QSet<HostQueue *> seen; 00364 Q_FOREACH (HostQueue *hq, *queuesBySerial) { 00365 Q_ASSERT(!seen.contains(hq)); 00366 seen.insert(hq); 00367 } 00368 #endif 00369 } 00370 00371 static void verifyRunningJobsCount(QHash<QString, HostQueue> *queues, int runningJobsCount) 00372 { 00373 Q_UNUSED(queues); 00374 Q_UNUSED(runningJobsCount); 00375 #ifdef SCHEDULER_DEBUG 00376 int realRunningJobsCount = 0; 00377 Q_FOREACH (const HostQueue &hq, *queues) { 00378 realRunningJobsCount += hq.runningJobsCount(); 00379 } 00380 Q_ASSERT(realRunningJobsCount == runningJobsCount); 00381 00382 // ...and of course we may never run the same job twice! 00383 QSet<SimpleJob *> seenJobs; 00384 Q_FOREACH (const HostQueue &hq, *queues) { 00385 Q_FOREACH (SimpleJob *job, hq.runningJobs()) { 00386 Q_ASSERT(!seenJobs.contains(job)); 00387 seenJobs.insert(job); 00388 } 00389 } 00390 #endif 00391 } 00392 00393 00394 ProtoQueue::ProtoQueue(SchedulerPrivate *sp, int maxSlaves, int maxSlavesPerHost) 00395 : m_schedPrivate(sp), 00396 m_maxConnectionsPerHost(maxSlavesPerHost ? maxSlavesPerHost : maxSlaves), 00397 m_maxConnectionsTotal(qMax(maxSlaves, maxSlavesPerHost)), 00398 m_runningJobsCount(0) 00399 00400 { 00401 kDebug(7006) << "m_maxConnectionsTotal:" << m_maxConnectionsTotal 00402 << "m_maxConnectionsPerHost:" << m_maxConnectionsPerHost; 00403 Q_ASSERT(m_maxConnectionsPerHost >= 1); 00404 Q_ASSERT(maxSlaves >= maxSlavesPerHost); 00405 m_startJobTimer.setSingleShot(true); 00406 connect (&m_startJobTimer, SIGNAL(timeout()), SLOT(startAJob())); 00407 } 00408 00409 ProtoQueue::~ProtoQueue() 00410 { 00411 Q_FOREACH (Slave *slave, allSlaves()) { 00412 // kill the slave process, then remove the interface in our process 00413 slave->kill(); 00414 slave->deref(); 00415 } 00416 } 00417 00418 void ProtoQueue::queueJob(SimpleJob *job) 00419 { 00420 QString hostname = SimpleJobPrivate::get(job)->m_url.host(); 00421 HostQueue &hq = m_queuesByHostname[hostname]; 00422 const int prevLowestSerial = hq.lowestSerial(); 00423 Q_ASSERT(hq.runningJobsCount() <= m_maxConnectionsPerHost); 00424 00425 // nevert insert a job twice 00426 Q_ASSERT(SimpleJobPrivate::get(job)->m_schedSerial == 0); 00427 SimpleJobPrivate::get(job)->m_schedSerial = m_serialPicker.next(); 00428 00429 const bool wasQueueEmpty = hq.isQueueEmpty(); 00430 hq.queueJob(job); 00431 // note that HostQueue::queueJob() into an empty queue changes its lowestSerial() too... 00432 // the queue's lowest serial job may have changed, so update the ordered list of queues. 00433 // however, we ignore all jobs that would cause more connections to a host than allowed. 00434 if (prevLowestSerial != hq.lowestSerial()) { 00435 if (hq.runningJobsCount() < m_maxConnectionsPerHost) { 00436 // if the connection limit didn't keep the HQ unscheduled it must have been lack of jobs 00437 if (m_queuesBySerial.remove(prevLowestSerial) == 0) { 00438 Q_UNUSED(wasQueueEmpty); 00439 Q_ASSERT(wasQueueEmpty); 00440 } 00441 m_queuesBySerial.insert(hq.lowestSerial(), &hq); 00442 } else { 00443 #ifdef SCHEDULER_DEBUG 00444 // ### this assertion may fail if the limits were modified at runtime! 00445 // if the per-host connection limit is already reached the host queue's lowest serial 00446 // should not be queued. 00447 Q_ASSERT(!m_queuesBySerial.contains(prevLowestSerial)); 00448 #endif 00449 } 00450 } 00451 // just in case; startAJob() will refuse to start a job if it shouldn't. 00452 m_startJobTimer.start(); 00453 00454 ensureNoDuplicates(&m_queuesBySerial); 00455 } 00456 00457 void ProtoQueue::changeJobPriority(SimpleJob *job, int newPrio) 00458 { 00459 SimpleJobPrivate *jobPriv = SimpleJobPrivate::get(job); 00460 QHash<QString, HostQueue>::Iterator it = m_queuesByHostname.find(jobPriv->m_url.host()); 00461 if (it == m_queuesByHostname.end()) { 00462 return; 00463 } 00464 HostQueue &hq = it.value(); 00465 const int prevLowestSerial = hq.lowestSerial(); 00466 if (hq.isJobRunning(job) || !hq.removeJob(job)) { 00467 return; 00468 } 00469 jobPriv->m_schedSerial = m_serialPicker.changedPrioritySerial(jobPriv->m_schedSerial, newPrio); 00470 hq.queueJob(job); 00471 const bool needReinsert = hq.lowestSerial() != prevLowestSerial; 00472 // the host queue might be absent from m_queuesBySerial because the connections per host limit 00473 // for that host has been reached. 00474 if (needReinsert && m_queuesBySerial.remove(prevLowestSerial)) { 00475 m_queuesBySerial.insert(hq.lowestSerial(), &hq); 00476 } 00477 ensureNoDuplicates(&m_queuesBySerial); 00478 } 00479 00480 void ProtoQueue::removeJob(SimpleJob *job) 00481 { 00482 SimpleJobPrivate *jobPriv = SimpleJobPrivate::get(job); 00483 HostQueue &hq = m_queuesByHostname[jobPriv->m_url.host()]; 00484 const int prevLowestSerial = hq.lowestSerial(); 00485 const int prevRunningJobs = hq.runningJobsCount(); 00486 00487 Q_ASSERT(hq.runningJobsCount() <= m_maxConnectionsPerHost); 00488 00489 if (hq.removeJob(job)) { 00490 if (hq.lowestSerial() != prevLowestSerial) { 00491 // we have dequeued the not yet running job with the lowest serial 00492 Q_ASSERT(!jobPriv->m_slave); 00493 Q_ASSERT(prevRunningJobs == hq.runningJobsCount()); 00494 if (m_queuesBySerial.remove(prevLowestSerial) == 0) { 00495 // make sure that the queue was not scheduled for a good reason 00496 Q_ASSERT(hq.runningJobsCount() == m_maxConnectionsPerHost); 00497 } 00498 } else { 00499 if (prevRunningJobs != hq.runningJobsCount()) { 00500 // we have dequeued a previously running job 00501 Q_ASSERT(prevRunningJobs - 1 == hq.runningJobsCount()); 00502 m_runningJobsCount--; 00503 Q_ASSERT(m_runningJobsCount >= 0); 00504 } 00505 } 00506 if (!hq.isQueueEmpty() && hq.runningJobsCount() < m_maxConnectionsPerHost) { 00507 // this may be a no-op, but it's faster than first checking if it's already in. 00508 m_queuesBySerial.insert(hq.lowestSerial(), &hq); 00509 } 00510 00511 if (hq.isEmpty()) { 00512 // no queued jobs, no running jobs. this destroys hq from above. 00513 m_queuesByHostname.remove(jobPriv->m_url.host()); 00514 } 00515 00516 if (jobPriv->m_slave && jobPriv->m_slave->isAlive()) { 00517 m_slaveKeeper.returnSlave(jobPriv->m_slave); 00518 } 00519 // just in case; startAJob() will refuse to start a job if it shouldn't. 00520 m_startJobTimer.start(); 00521 } else { 00522 // should be a connected slave 00523 // if the assertion fails the job has probably changed the host part of its URL while 00524 // running, so we can't find it by hostname. don't do this. 00525 const bool removed = m_connectedSlaveQueue.removeJob(job); 00526 Q_UNUSED(removed); 00527 Q_ASSERT(removed); 00528 } 00529 00530 ensureNoDuplicates(&m_queuesBySerial); 00531 } 00532 00533 Slave *ProtoQueue::createSlave(const QString &protocol, SimpleJob *job, const KUrl &url) 00534 { 00535 int error; 00536 QString errortext; 00537 Slave *slave = Slave::createSlave(protocol, url, error, errortext); 00538 if (slave) { 00539 scheduler()->connect(slave, SIGNAL(slaveDied(KIO::Slave *)), 00540 SLOT(slotSlaveDied(KIO::Slave *))); 00541 scheduler()->connect(slave, SIGNAL(slaveStatus(pid_t,const QByteArray&,const QString &, bool)), 00542 SLOT(slotSlaveStatus(pid_t,const QByteArray&, const QString &, bool))); 00543 } else { 00544 kError() << "couldn't create slave:" << errortext; 00545 if (job) { 00546 job->slotError(error, errortext); 00547 } 00548 } 00549 return slave; 00550 } 00551 00552 bool ProtoQueue::removeSlave (KIO::Slave *slave) 00553 { 00554 const bool removedConnected = m_connectedSlaveQueue.removeSlave(slave); 00555 const bool removedUnconnected = m_slaveKeeper.removeSlave(slave); 00556 Q_ASSERT(!(removedConnected && removedUnconnected)); 00557 return removedConnected || removedUnconnected; 00558 } 00559 00560 QList<Slave *> ProtoQueue::allSlaves() const 00561 { 00562 QList<Slave *> ret(m_slaveKeeper.allSlaves()); 00563 Q_FOREACH (const HostQueue &hq, m_queuesByHostname) { 00564 ret.append(hq.allSlaves()); 00565 } 00566 ret.append(m_connectedSlaveQueue.allSlaves()); 00567 return ret; 00568 } 00569 00570 //private slot 00571 void ProtoQueue::startAJob() 00572 { 00573 ensureNoDuplicates(&m_queuesBySerial); 00574 verifyRunningJobsCount(&m_queuesByHostname, m_runningJobsCount); 00575 00576 #ifdef SCHEDULER_DEBUG 00577 kDebug(7006) << "m_runningJobsCount:" << m_runningJobsCount; 00578 Q_FOREACH (const HostQueue &hq, m_queuesByHostname) { 00579 Q_FOREACH (SimpleJob *job, hq.runningJobs()) { 00580 kDebug(7006) << SimpleJobPrivate::get(job)->m_url; 00581 } 00582 } 00583 #endif 00584 if (m_runningJobsCount >= m_maxConnectionsTotal) { 00585 #ifdef SCHEDULER_DEBUG 00586 kDebug(7006) << "not starting any jobs because maxConnectionsTotal has been reached."; 00587 #endif 00588 return; 00589 } 00590 00591 QMap<int, HostQueue *>::iterator first = m_queuesBySerial.begin(); 00592 if (first != m_queuesBySerial.end()) { 00593 // pick a job and maintain the queue invariant: lower serials first 00594 HostQueue *hq = first.value(); 00595 const int prevLowestSerial = first.key(); 00596 Q_UNUSED(prevLowestSerial); 00597 Q_ASSERT(hq->lowestSerial() == prevLowestSerial); 00598 // the following assertions should hold due to queueJob(), takeFirstInQueue() and 00599 // removeJob() being correct 00600 Q_ASSERT(hq->runningJobsCount() < m_maxConnectionsPerHost); 00601 SimpleJob *startingJob = hq->takeFirstInQueue(); 00602 Q_ASSERT(hq->runningJobsCount() <= m_maxConnectionsPerHost); 00603 Q_ASSERT(hq->lowestSerial() != prevLowestSerial); 00604 00605 m_queuesBySerial.erase(first); 00606 // we've increased hq's runningJobsCount() by calling nexStartingJob() 00607 // so we need to check again. 00608 if (!hq->isQueueEmpty() && hq->runningJobsCount() < m_maxConnectionsPerHost) { 00609 m_queuesBySerial.insert(hq->lowestSerial(), hq); 00610 } 00611 00612 // always increase m_runningJobsCount because it's correct if there is a slave and if there 00613 // is no slave, removeJob() will balance the number again. removeJob() would decrease the 00614 // number too much otherwise. 00615 // Note that createSlave() can call slotError() on a job which in turn calls removeJob(), 00616 // so increase the count here already. 00617 m_runningJobsCount++; 00618 00619 bool isNewSlave = false; 00620 Slave *slave = m_slaveKeeper.takeSlaveForJob(startingJob); 00621 SimpleJobPrivate *jobPriv = SimpleJobPrivate::get(startingJob); 00622 if (!slave) { 00623 isNewSlave = true; 00624 slave = createSlave(jobPriv->m_protocol, startingJob, jobPriv->m_url); 00625 } 00626 00627 if (slave) { 00628 jobPriv->m_slave = slave; 00629 setupSlave(slave, jobPriv->m_url, jobPriv->m_protocol, jobPriv->m_proxyList, isNewSlave); 00630 startJob(startingJob, slave); 00631 } else { 00632 // dispose of our records about the job and mark the job as unknown 00633 // (to prevent crashes later) 00634 // note that the job's slotError() can have called removeJob() first, so check that 00635 // it's not a ghost job with null serial already. 00636 if (jobPriv->m_schedSerial) { 00637 removeJob(startingJob); 00638 jobPriv->m_schedSerial = 0; 00639 } 00640 } 00641 } else { 00642 #ifdef SCHEDULER_DEBUG 00643 kDebug(7006) << "not starting any jobs because there is no queued job."; 00644 #endif 00645 } 00646 00647 if (!m_queuesBySerial.isEmpty()) { 00648 m_startJobTimer.start(); 00649 } 00650 } 00651 00652 00653 00654 class KIO::SchedulerPrivate 00655 { 00656 public: 00657 SchedulerPrivate() 00658 : q(new Scheduler()), 00659 m_slaveOnHold(0), 00660 m_checkOnHold(true), // !! Always check with KLauncher for the first request 00661 m_ignoreConfigReparse(false) 00662 { 00663 } 00664 00665 ~SchedulerPrivate() 00666 { 00667 delete q; 00668 q = 0; 00669 Q_FOREACH (ProtoQueue *p, m_protocols) { 00670 Q_FOREACH (Slave *slave, p->allSlaves()) { 00671 slave->kill(); 00672 } 00673 p->deleteLater(); 00674 } 00675 } 00676 Scheduler *q; 00677 00678 Slave *m_slaveOnHold; 00679 KUrl m_urlOnHold; 00680 bool m_checkOnHold; 00681 bool m_ignoreConfigReparse; 00682 00683 SessionData sessionData; 00684 QMap<QObject *,WId> m_windowList; 00685 00686 void doJob(SimpleJob *job); 00687 #ifndef KDE_NO_DEPRECATED 00688 void scheduleJob(SimpleJob *job); 00689 #endif 00690 void setJobPriority(SimpleJob *job, int priority); 00691 void cancelJob(SimpleJob *job); 00692 void jobFinished(KIO::SimpleJob *job, KIO::Slave *slave); 00693 void putSlaveOnHold(KIO::SimpleJob *job, const KUrl &url); 00694 void removeSlaveOnHold(); 00695 Slave *getConnectedSlave(const KUrl &url, const KIO::MetaData &metaData); 00696 bool assignJobToSlave(KIO::Slave *slave, KIO::SimpleJob *job); 00697 bool disconnectSlave(KIO::Slave *slave); 00698 void checkSlaveOnHold(bool b); 00699 void publishSlaveOnHold(); 00700 Slave *heldSlaveForJob(KIO::SimpleJob *job); 00701 bool isSlaveOnHoldFor(const KUrl& url); 00702 void registerWindow(QWidget *wid); 00703 void updateInternalMetaData(SimpleJob* job); 00704 00705 MetaData metaDataFor(const QString &protocol, const QStringList &proxyList, const KUrl &url); 00706 void setupSlave(KIO::Slave *slave, const KUrl &url, const QString &protocol, 00707 const QStringList &proxyList, bool newSlave, const KIO::MetaData *config = 0); 00708 00709 void slotSlaveDied(KIO::Slave *slave); 00710 void slotSlaveStatus(pid_t pid, const QByteArray &protocol, 00711 const QString &host, bool connected); 00712 00713 void slotReparseSlaveConfiguration(const QString &, const QDBusMessage&); 00714 void slotSlaveOnHoldListChanged(); 00715 00716 void slotSlaveConnected(); 00717 void slotSlaveError(int error, const QString &errorMsg); 00718 void slotUnregisterWindow(QObject *); 00719 00720 ProtoQueue *protoQ(const QString &p) 00721 { 00722 ProtoQueue *pq = m_protocols.value(p, 0); 00723 if (!pq) { 00724 kDebug(7006) << "creating ProtoQueue instance for" << p; 00725 pq = new ProtoQueue(this, KProtocolInfo::maxSlaves(p), 00726 KProtocolInfo::maxSlavesPerHost(p)); 00727 m_protocols.insert(p, pq); 00728 } 00729 return pq; 00730 } 00731 private: 00732 QHash<QString, ProtoQueue *> m_protocols; 00733 }; 00734 00735 00736 K_GLOBAL_STATIC(SchedulerPrivate, schedulerPrivate) 00737 00738 Scheduler *Scheduler::self() 00739 { 00740 return schedulerPrivate->q; 00741 } 00742 00743 SchedulerPrivate *Scheduler::d_func() 00744 { 00745 return schedulerPrivate; 00746 } 00747 00748 //static 00749 Scheduler *scheduler() 00750 { 00751 return schedulerPrivate->q; 00752 } 00753 00754 //static 00755 Slave *heldSlaveForJob(SimpleJob *job) 00756 { 00757 return schedulerPrivate->heldSlaveForJob(job); 00758 } 00759 00760 00761 Scheduler::Scheduler() 00762 : removeMe(0) 00763 { 00764 setObjectName( "scheduler" ); 00765 00766 const QString dbusPath = "/KIO/Scheduler"; 00767 const QString dbusInterface = "org.kde.KIO.Scheduler"; 00768 QDBusConnection dbus = QDBusConnection::sessionBus(); 00769 dbus.registerObject( "/KIO/Scheduler", this, QDBusConnection::ExportScriptableSlots | 00770 QDBusConnection::ExportScriptableSignals ); 00771 dbus.connect(QString(), dbusPath, dbusInterface, "reparseSlaveConfiguration", 00772 this, SLOT(slotReparseSlaveConfiguration(QString,QDBusMessage))); 00773 dbus.connect(QString(), dbusPath, dbusInterface, "slaveOnHoldListChanged", 00774 this, SLOT(slotSlaveOnHoldListChanged())); 00775 } 00776 00777 Scheduler::~Scheduler() 00778 { 00779 } 00780 00781 void Scheduler::doJob(SimpleJob *job) 00782 { 00783 schedulerPrivate->doJob(job); 00784 } 00785 00786 #ifndef KDE_NO_DEPRECATED 00787 void Scheduler::scheduleJob(SimpleJob *job) 00788 { 00789 schedulerPrivate->scheduleJob(job); 00790 } 00791 #endif 00792 00793 void Scheduler::setJobPriority(SimpleJob *job, int priority) 00794 { 00795 schedulerPrivate->setJobPriority(job, priority); 00796 } 00797 00798 void Scheduler::cancelJob(SimpleJob *job) 00799 { 00800 schedulerPrivate->cancelJob(job); 00801 } 00802 00803 void Scheduler::jobFinished(KIO::SimpleJob *job, KIO::Slave *slave) 00804 { 00805 schedulerPrivate->jobFinished(job, slave); 00806 } 00807 00808 void Scheduler::putSlaveOnHold(KIO::SimpleJob *job, const KUrl &url) 00809 { 00810 schedulerPrivate->putSlaveOnHold(job, url); 00811 } 00812 00813 void Scheduler::removeSlaveOnHold() 00814 { 00815 schedulerPrivate->removeSlaveOnHold(); 00816 } 00817 00818 void Scheduler::publishSlaveOnHold() 00819 { 00820 schedulerPrivate->publishSlaveOnHold(); 00821 } 00822 00823 bool Scheduler::isSlaveOnHoldFor(const KUrl& url) 00824 { 00825 return schedulerPrivate->isSlaveOnHoldFor(url); 00826 } 00827 00828 void Scheduler::updateInternalMetaData(SimpleJob* job) 00829 { 00830 schedulerPrivate->updateInternalMetaData(job); 00831 } 00832 00833 KIO::Slave *Scheduler::getConnectedSlave(const KUrl &url, 00834 const KIO::MetaData &config ) 00835 { 00836 return schedulerPrivate->getConnectedSlave(url, config); 00837 } 00838 00839 bool Scheduler::assignJobToSlave(KIO::Slave *slave, KIO::SimpleJob *job) 00840 { 00841 return schedulerPrivate->assignJobToSlave(slave, job); 00842 } 00843 00844 bool Scheduler::disconnectSlave(KIO::Slave *slave) 00845 { 00846 return schedulerPrivate->disconnectSlave(slave); 00847 } 00848 00849 void Scheduler::registerWindow(QWidget *wid) 00850 { 00851 schedulerPrivate->registerWindow(wid); 00852 } 00853 00854 void Scheduler::unregisterWindow(QObject *wid) 00855 { 00856 schedulerPrivate->slotUnregisterWindow(wid); 00857 } 00858 00859 bool Scheduler::connect( const char *signal, const QObject *receiver, 00860 const char *member) 00861 { 00862 return QObject::connect(self(), signal, receiver, member); 00863 } 00864 00865 bool Scheduler::connect( const QObject* sender, const char* signal, 00866 const QObject* receiver, const char* member ) 00867 { 00868 return QObject::connect(sender, signal, receiver, member); 00869 } 00870 00871 bool Scheduler::disconnect( const QObject* sender, const char* signal, 00872 const QObject* receiver, const char* member ) 00873 { 00874 return QObject::disconnect(sender, signal, receiver, member); 00875 } 00876 00877 bool Scheduler::connect( const QObject *sender, const char *signal, 00878 const char *member ) 00879 { 00880 return QObject::connect(sender, signal, member); 00881 } 00882 00883 void Scheduler::checkSlaveOnHold(bool b) 00884 { 00885 schedulerPrivate->checkSlaveOnHold(b); 00886 } 00887 00888 void Scheduler::emitReparseSlaveConfiguration() 00889 { 00890 // Do it immediately in this process, otherwise we might send a request before reparsing 00891 // (e.g. when changing useragent in the plugin) 00892 schedulerPrivate->slotReparseSlaveConfiguration(QString(), QDBusMessage()); 00893 00894 schedulerPrivate->m_ignoreConfigReparse = true; 00895 emit self()->reparseSlaveConfiguration( QString() ); 00896 } 00897 00898 00899 void SchedulerPrivate::slotReparseSlaveConfiguration(const QString &proto, const QDBusMessage&) 00900 { 00901 if (m_ignoreConfigReparse) { 00902 kDebug(7006) << "Ignoring signal sent by myself"; 00903 m_ignoreConfigReparse = false; 00904 return; 00905 } 00906 00907 kDebug(7006) << "proto=" << proto; 00908 KProtocolManager::reparseConfiguration(); 00909 SlaveConfig::self()->reset(); 00910 sessionData.reset(); 00911 NetRC::self()->reload(); 00912 00913 QHash<QString, ProtoQueue *>::ConstIterator it = proto.isEmpty() ? m_protocols.constBegin() : 00914 m_protocols.constFind(proto); 00915 // not found? 00916 if (it == m_protocols.constEnd()) { 00917 return; 00918 } 00919 QHash<QString, ProtoQueue *>::ConstIterator endIt = proto.isEmpty() ? m_protocols.constEnd() : 00920 it + 1; 00921 for (; it != endIt; ++it) { 00922 Q_FOREACH(Slave *slave, (*it)->allSlaves()) { 00923 slave->send(CMD_REPARSECONFIGURATION); 00924 slave->resetHost(); 00925 } 00926 } 00927 } 00928 00929 void SchedulerPrivate::slotSlaveOnHoldListChanged() 00930 { 00931 m_checkOnHold = true; 00932 } 00933 00934 static bool mayReturnContent(int cmd, const QString& protocol) 00935 { 00936 if (cmd == CMD_GET) 00937 return true; 00938 00939 if (cmd == CMD_MULTI_GET) 00940 return true; 00941 00942 if (cmd == CMD_SPECIAL && protocol.startsWith(QLatin1String("http"), Qt::CaseInsensitive)) 00943 return true; 00944 00945 return false; 00946 } 00947 00948 void SchedulerPrivate::doJob(SimpleJob *job) 00949 { 00950 kDebug(7006) << job; 00951 if (QThread::currentThread() != QCoreApplication::instance()->thread()) { 00952 kWarning(7006) << "KIO is not thread-safe."; 00953 } 00954 00955 KIO::SimpleJobPrivate *const jobPriv = SimpleJobPrivate::get(job); 00956 jobPriv->m_proxyList.clear(); 00957 jobPriv->m_protocol = KProtocolManager::slaveProtocol(job->url(), jobPriv->m_proxyList); 00958 00959 if (mayReturnContent(jobCommand(job), jobPriv->m_protocol)) { 00960 jobPriv->m_checkOnHold = m_checkOnHold; 00961 m_checkOnHold = false; 00962 } 00963 00964 ProtoQueue *proto = protoQ(jobPriv->m_protocol); 00965 proto->queueJob(job); 00966 } 00967 00968 #ifndef KDE_NO_DEPRECATED 00969 void SchedulerPrivate::scheduleJob(SimpleJob *job) 00970 { 00971 kDebug(7006) << job; 00972 setJobPriority(job, 1); 00973 } 00974 #endif 00975 00976 void SchedulerPrivate::setJobPriority(SimpleJob *job, int priority) 00977 { 00978 kDebug(7006) << job << priority; 00979 ProtoQueue *proto = protoQ(SimpleJobPrivate::get(job)->m_protocol); 00980 proto->changeJobPriority(job, priority); 00981 } 00982 00983 void SchedulerPrivate::cancelJob(SimpleJob *job) 00984 { 00985 // this method is called all over the place in job.cpp, so just do this check here to avoid 00986 // much boilerplate in job code. 00987 if (SimpleJobPrivate::get(job)->m_schedSerial == 0) { 00988 //kDebug(7006) << "Doing nothing because I don't know job" << job; 00989 return; 00990 } 00991 Slave *slave = jobSlave(job); 00992 kDebug(7006) << job << slave; 00993 if (slave) { 00994 kDebug(7006) << "Scheduler: killing slave " << slave->slave_pid(); 00995 slave->kill(); 00996 } 00997 jobFinished(job, slave); 00998 } 00999 01000 void SchedulerPrivate::jobFinished(SimpleJob *job, Slave *slave) 01001 { 01002 kDebug(7006) << job << slave; 01003 if (QThread::currentThread() != QCoreApplication::instance()->thread()) { 01004 kWarning(7006) << "KIO is not thread-safe."; 01005 } 01006 01007 KIO::SimpleJobPrivate *const jobPriv = SimpleJobPrivate::get(job); 01008 01009 // make sure that we knew about the job! 01010 Q_ASSERT(jobPriv->m_schedSerial); 01011 01012 ProtoQueue *pq = m_protocols.value(jobPriv->m_protocol); 01013 if (pq) { 01014 pq->removeJob(job); 01015 } 01016 01017 if (slave) { 01018 // If we have internal meta-data, tell existing ioslaves to reload 01019 // their configuration. 01020 if (jobPriv->m_internalMetaData.count()) { 01021 kDebug(7006) << "Updating ioslaves with new internal metadata information"; 01022 ProtoQueue * queue = m_protocols.value(slave->protocol()); 01023 if (queue) { 01024 QListIterator<Slave*> it (queue->allSlaves()); 01025 while (it.hasNext()) { 01026 Slave* runningSlave = it.next(); 01027 if (slave->host() == runningSlave->host()) { 01028 slave->setConfig(metaDataFor(slave->protocol(), jobPriv->m_proxyList, job->url())); 01029 kDebug(7006) << "Updated configuration of" << slave->protocol() 01030 << "ioslave, pid=" << slave->slave_pid(); 01031 } 01032 } 01033 } 01034 } 01035 slave->setJob(0); 01036 slave->disconnect(job); 01037 } 01038 jobPriv->m_schedSerial = 0; // this marks the job as unscheduled again 01039 jobPriv->m_slave = 0; 01040 // Clear the values in the internal metadata container since they have 01041 // already been taken care of above... 01042 jobPriv->m_internalMetaData.clear(); 01043 } 01044 01045 // static 01046 void setupSlave(KIO::Slave *slave, const KUrl &url, const QString &protocol, 01047 const QStringList &proxyList , bool newSlave, const KIO::MetaData *config) 01048 { 01049 schedulerPrivate->setupSlave(slave, url, protocol, proxyList, newSlave, config); 01050 } 01051 01052 MetaData SchedulerPrivate::metaDataFor(const QString &protocol, const QStringList &proxyList, const KUrl &url) 01053 { 01054 const QString host = url.host(); 01055 MetaData configData = SlaveConfig::self()->configData(protocol, host); 01056 sessionData.configDataFor( configData, protocol, host ); 01057 if (proxyList.isEmpty()) { 01058 configData.remove(QLatin1String("UseProxy")); 01059 configData.remove(QLatin1String("ProxyUrls")); 01060 } else { 01061 configData[QLatin1String("UseProxy")] = proxyList.first(); 01062 configData[QLatin1String("ProxyUrls")] = proxyList.join(QLatin1String(",")); 01063 } 01064 01065 if ( configData.contains("EnableAutoLogin") && 01066 configData.value("EnableAutoLogin").compare("true", Qt::CaseInsensitive) == 0 ) 01067 { 01068 NetRC::AutoLogin l; 01069 l.login = url.user(); 01070 bool usern = (protocol == "ftp"); 01071 if ( NetRC::self()->lookup( url, l, usern) ) 01072 { 01073 configData["autoLoginUser"] = l.login; 01074 configData["autoLoginPass"] = l.password; 01075 if ( usern ) 01076 { 01077 QString macdef; 01078 QMap<QString, QStringList>::ConstIterator it = l.macdef.constBegin(); 01079 for ( ; it != l.macdef.constEnd(); ++it ) 01080 macdef += it.key() + '\\' + it.value().join( "\\" ) + '\n'; 01081 configData["autoLoginMacro"] = macdef; 01082 } 01083 } 01084 } 01085 01086 return configData; 01087 } 01088 01089 void SchedulerPrivate::setupSlave(KIO::Slave *slave, const KUrl &url, const QString &protocol, 01090 const QStringList &proxyList, bool newSlave, const KIO::MetaData *config) 01091 { 01092 int port = url.port(); 01093 if ( port == -1 ) // no port is -1 in QUrl, but in kde3 we used 0 and the kioslaves assume that. 01094 port = 0; 01095 const QString host = url.host(); 01096 const QString user = url.user(); 01097 const QString passwd = url.pass(); 01098 01099 if (newSlave || slave->host() != host || slave->port() != port || 01100 slave->user() != user || slave->passwd() != passwd) { 01101 01102 MetaData configData = metaDataFor(protocol, proxyList, url); 01103 if (config) 01104 configData += *config; 01105 01106 slave->setConfig(configData); 01107 slave->setProtocol(url.protocol()); 01108 slave->setHost(host, port, user, passwd); 01109 } 01110 } 01111 01112 01113 void SchedulerPrivate::slotSlaveStatus(pid_t, const QByteArray&, const QString &, bool) 01114 { 01115 } 01116 01117 01118 void SchedulerPrivate::slotSlaveDied(KIO::Slave *slave) 01119 { 01120 kDebug(7006) << slave; 01121 Q_ASSERT(slave); 01122 Q_ASSERT(!slave->isAlive()); 01123 ProtoQueue *pq = m_protocols.value(slave->protocol()); 01124 if (pq) { 01125 if (slave->job()) { 01126 pq->removeJob(slave->job()); 01127 } 01128 // in case this was a connected slave... 01129 pq->removeSlave(slave); 01130 } 01131 if (slave == m_slaveOnHold) { 01132 m_slaveOnHold = 0; 01133 m_urlOnHold.clear(); 01134 } 01135 slave->deref(); // Delete slave 01136 } 01137 01138 void SchedulerPrivate::putSlaveOnHold(KIO::SimpleJob *job, const KUrl &url) 01139 { 01140 Slave *slave = jobSlave(job); 01141 kDebug(7006) << job << url << slave; 01142 slave->disconnect(job); 01143 // prevent the fake death of the slave from trying to kill the job again; 01144 // cf. Slave::hold(const KUrl &url) called in SchedulerPrivate::publishSlaveOnHold(). 01145 slave->setJob(0); 01146 SimpleJobPrivate::get(job)->m_slave = 0; 01147 01148 if (m_slaveOnHold) { 01149 m_slaveOnHold->kill(); 01150 } 01151 m_slaveOnHold = slave; 01152 m_urlOnHold = url; 01153 m_slaveOnHold->suspend(); 01154 } 01155 01156 void SchedulerPrivate::publishSlaveOnHold() 01157 { 01158 kDebug(7006) << m_slaveOnHold; 01159 if (!m_slaveOnHold) 01160 return; 01161 01162 m_slaveOnHold->hold(m_urlOnHold); 01163 emit q->slaveOnHoldListChanged(); 01164 } 01165 01166 bool SchedulerPrivate::isSlaveOnHoldFor(const KUrl& url) 01167 { 01168 if (url.isValid() && m_urlOnHold.isValid() && url == m_urlOnHold) 01169 return true; 01170 01171 return Slave::checkForHeldSlave(url); 01172 } 01173 01174 Slave *SchedulerPrivate::heldSlaveForJob(SimpleJob *job) 01175 { 01176 Slave *slave = 0; 01177 KIO::SimpleJobPrivate *const jobPriv = SimpleJobPrivate::get(job); 01178 01179 if (jobPriv->m_checkOnHold) { 01180 slave = Slave::holdSlave(jobPriv->m_protocol, job->url()); 01181 } 01182 01183 if (!slave && m_slaveOnHold) { 01184 // Make sure that the job wants to do a GET or a POST, and with no offset 01185 const int cmd = jobPriv->m_command; 01186 bool canJobReuse = (cmd == CMD_GET || cmd == CMD_MULTI_GET); 01187 01188 if (KIO::TransferJob *tJob = qobject_cast<KIO::TransferJob *>(job)) { 01189 canJobReuse = ( canJobReuse || cmd == CMD_SPECIAL ); 01190 if (canJobReuse) { 01191 KIO::MetaData outgoing = tJob->outgoingMetaData(); 01192 const QString resume = outgoing.value("resume"); 01193 kDebug(7006) << "Resume metadata is" << resume; 01194 canJobReuse = (resume.isEmpty() || resume == "0"); 01195 } 01196 } 01197 01198 if (job->url() == m_urlOnHold) { 01199 if (canJobReuse) { 01200 kDebug(7006) << "HOLD: Reusing held slave (" << m_slaveOnHold << ")"; 01201 slave = m_slaveOnHold; 01202 } else { 01203 kDebug(7006) << "HOLD: Discarding held slave (" << m_slaveOnHold << ")"; 01204 m_slaveOnHold->kill(); 01205 } 01206 m_slaveOnHold = 0; 01207 m_urlOnHold.clear(); 01208 } 01209 } else if (slave) { 01210 kDebug(7006) << "HOLD: Reusing klauncher held slave (" << slave << ")"; 01211 } 01212 01213 return slave; 01214 } 01215 01216 void SchedulerPrivate::removeSlaveOnHold() 01217 { 01218 kDebug(7006) << m_slaveOnHold; 01219 if (m_slaveOnHold) { 01220 m_slaveOnHold->kill(); 01221 } 01222 m_slaveOnHold = 0; 01223 m_urlOnHold.clear(); 01224 } 01225 01226 Slave *SchedulerPrivate::getConnectedSlave(const KUrl &url, const KIO::MetaData &config) 01227 { 01228 QStringList proxyList; 01229 const QString protocol = KProtocolManager::slaveProtocol(url, proxyList); 01230 ProtoQueue *pq = protoQ(protocol); 01231 01232 Slave *slave = pq->createSlave(protocol, /* job */0, url); 01233 if (slave) { 01234 setupSlave(slave, url, protocol, proxyList, true, &config); 01235 pq->m_connectedSlaveQueue.addSlave(slave); 01236 01237 slave->send( CMD_CONNECT ); 01238 q->connect(slave, SIGNAL(connected()), 01239 SLOT(slotSlaveConnected())); 01240 q->connect(slave, SIGNAL(error(int, const QString &)), 01241 SLOT(slotSlaveError(int, const QString &))); 01242 } 01243 kDebug(7006) << url << slave; 01244 return slave; 01245 } 01246 01247 01248 void SchedulerPrivate::slotSlaveConnected() 01249 { 01250 kDebug(7006); 01251 Slave *slave = static_cast<Slave *>(q->sender()); 01252 slave->setConnected(true); 01253 q->disconnect(slave, SIGNAL(connected()), q, SLOT(slotSlaveConnected())); 01254 emit q->slaveConnected(slave); 01255 } 01256 01257 void SchedulerPrivate::slotSlaveError(int errorNr, const QString &errorMsg) 01258 { 01259 Slave *slave = static_cast<Slave *>(q->sender()); 01260 kDebug(7006) << slave << errorNr << errorMsg; 01261 ProtoQueue *pq = protoQ(slave->protocol()); 01262 if (!slave->isConnected() || pq->m_connectedSlaveQueue.isIdle(slave)) { 01263 // Only forward to application if slave is idle or still connecting. 01264 // ### KDE5: can we remove this apparently arbitrary behavior and just always emit SlaveError? 01265 emit q->slaveError(slave, errorNr, errorMsg); 01266 } 01267 } 01268 01269 bool SchedulerPrivate::assignJobToSlave(KIO::Slave *slave, SimpleJob *job) 01270 { 01271 kDebug(7006) << slave << job; 01272 // KDE5: queueing of jobs can probably be removed, it provides very little benefit 01273 ProtoQueue *pq = m_protocols.value(slave->protocol()); 01274 pq->removeJob(job); 01275 return (pq ? pq->m_connectedSlaveQueue.queueJob(job, slave) : false); 01276 } 01277 01278 bool SchedulerPrivate::disconnectSlave(KIO::Slave *slave) 01279 { 01280 kDebug(7006) << slave; 01281 ProtoQueue *pq = m_protocols.value(slave->protocol()); 01282 return (pq ? pq->m_connectedSlaveQueue.removeSlave(slave) : false); 01283 } 01284 01285 void SchedulerPrivate::checkSlaveOnHold(bool b) 01286 { 01287 kDebug(7006) << b; 01288 m_checkOnHold = b; 01289 } 01290 01291 void SchedulerPrivate::registerWindow(QWidget *wid) 01292 { 01293 if (!wid) 01294 return; 01295 01296 QWidget* window = wid->window(); 01297 01298 QObject *obj = static_cast<QObject *>(window); 01299 if (!m_windowList.contains(obj)) 01300 { 01301 // We must store the window Id because by the time 01302 // the destroyed signal is emitted we can no longer 01303 // access QWidget::winId() (already destructed) 01304 WId windowId = window->winId(); 01305 m_windowList.insert(obj, windowId); 01306 q->connect(window, SIGNAL(destroyed(QObject *)), 01307 SLOT(slotUnregisterWindow(QObject*))); 01308 QDBusInterface("org.kde.kded", "/kded", "org.kde.kded"). 01309 call(QDBus::NoBlock, "registerWindowId", qlonglong(windowId)); 01310 } 01311 } 01312 01313 void SchedulerPrivate::slotUnregisterWindow(QObject *obj) 01314 { 01315 if (!obj) 01316 return; 01317 01318 QMap<QObject *, WId>::Iterator it = m_windowList.find(obj); 01319 if (it == m_windowList.end()) 01320 return; 01321 WId windowId = it.value(); 01322 q->disconnect(it.key(), SIGNAL(destroyed(QObject *)), 01323 q, SLOT(slotUnregisterWindow(QObject*))); 01324 m_windowList.erase( it ); 01325 QDBusInterface("org.kde.kded", "/kded", "org.kde.kded"). 01326 call(QDBus::NoBlock, "unregisterWindowId", qlonglong(windowId)); 01327 } 01328 01329 void SchedulerPrivate::updateInternalMetaData(SimpleJob* job) 01330 { 01331 KIO::SimpleJobPrivate *const jobPriv = SimpleJobPrivate::get(job); 01332 // Preserve all internal meta-data so they can be sent back to the 01333 // ioslaves as needed... 01334 const KUrl jobUrl = job->url(); 01335 kDebug(7006) << job << jobPriv->m_internalMetaData; 01336 QMapIterator<QString, QString> it (jobPriv->m_internalMetaData); 01337 while (it.hasNext()) { 01338 it.next(); 01339 if (it.key().startsWith(QLatin1String("{internal~currenthost}"), Qt::CaseInsensitive)) { 01340 SlaveConfig::self()->setConfigData(jobUrl.protocol(), jobUrl.host(), it.key().mid(22), it.value()); 01341 } else if (it.key().startsWith(QLatin1String("{internal~allhosts}"), Qt::CaseInsensitive)) { 01342 SlaveConfig::self()->setConfigData(jobUrl.protocol(), QString(), it.key().mid(19), it.value()); 01343 } 01344 } 01345 } 01346 01347 01348 #include "scheduler.moc" 01349 #include "scheduler_p.moc"
KDE 4.7 API Reference