KIO
connection.cpp
Go to the documentation of this file.
00001 /* This file is part of the KDE libraries 00002 Copyright (C) 2000 Stephan Kulow <coolo@kde.org> 00003 David Faure <faure@kde.org> 00004 Copyright (C) 2007 Thiago Macieira <thiago@kde.org> 00005 00006 This library is free software; you can redistribute it and/or 00007 modify it under the terms of the GNU Library General Public 00008 License as published by the Free Software Foundation; either 00009 version 2 of the License, or (at your option) any later version. 00010 00011 This library is distributed in the hope that it will be useful, 00012 but WITHOUT ANY WARRANTY; without even the implied warranty of 00013 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 00014 Library General Public License for more details. 00015 00016 You should have received a copy of the GNU Library General Public License 00017 along with this library; see the file COPYING.LIB. If not, write to 00018 the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, 00019 Boston, MA 02110-1301, USA. 00020 */ 00021 00022 #include "connection.h" 00023 #include "connection_p.h" 00024 00025 #include <errno.h> 00026 00027 #include <QQueue> 00028 #include <QPointer> 00029 #include <QTime> 00030 00031 #include "kdebug.h" 00032 #include "kcomponentdata.h" 00033 #include "kglobal.h" 00034 #include "klocale.h" 00035 #include "kstandarddirs.h" 00036 #include "ktemporaryfile.h" 00037 #include "kurl.h" 00038 00039 using namespace KIO; 00040 00041 class KIO::ConnectionPrivate 00042 { 00043 public: 00044 inline ConnectionPrivate() 00045 : backend(0), suspended(false) 00046 { } 00047 00048 void dequeue(); 00049 void commandReceived(const Task &task); 00050 void disconnected(); 00051 void setBackend(AbstractConnectionBackend *b); 00052 00053 QQueue<Task> outgoingTasks; 00054 QQueue<Task> incomingTasks; 00055 AbstractConnectionBackend *backend; 00056 Connection *q; 00057 bool suspended; 00058 }; 00059 00060 class KIO::ConnectionServerPrivate 00061 { 00062 public: 00063 inline ConnectionServerPrivate() 00064 : backend(0) 00065 { } 00066 00067 ConnectionServer *q; 00068 AbstractConnectionBackend *backend; 00069 }; 00070 00071 void ConnectionPrivate::dequeue() 00072 { 00073 if (!backend || suspended) 00074 return; 00075 00076 while (!outgoingTasks.isEmpty()) { 00077 const Task task = outgoingTasks.dequeue(); 00078 q->sendnow(task.cmd, task.data); 00079 } 00080 00081 if (!incomingTasks.isEmpty()) 00082 emit q->readyRead(); 00083 } 00084 00085 void ConnectionPrivate::commandReceived(const Task &task) 00086 { 00087 //kDebug() << this << "Command " << task.cmd << " added to the queue"; 00088 if (!suspended && incomingTasks.isEmpty()) 00089 QMetaObject::invokeMethod(q, "dequeue", Qt::QueuedConnection); 00090 incomingTasks.enqueue(task); 00091 } 00092 00093 void ConnectionPrivate::disconnected() 00094 { 00095 q->close(); 00096 QMetaObject::invokeMethod(q, "readyRead", Qt::QueuedConnection); 00097 } 00098 00099 void ConnectionPrivate::setBackend(AbstractConnectionBackend *b) 00100 { 00101 backend = b; 00102 if (backend) { 00103 q->connect(backend, SIGNAL(commandReceived(Task)), SLOT(commandReceived(Task))); 00104 q->connect(backend, SIGNAL(disconnected()), SLOT(disconnected())); 00105 backend->setSuspended(suspended); 00106 } 00107 } 00108 00109 AbstractConnectionBackend::AbstractConnectionBackend(QObject *parent) 00110 : QObject(parent), state(Idle) 00111 { 00112 } 00113 00114 AbstractConnectionBackend::~AbstractConnectionBackend() 00115 { 00116 } 00117 00118 SocketConnectionBackend::SocketConnectionBackend(Mode m, QObject *parent) 00119 : AbstractConnectionBackend(parent), socket(0), len(-1), cmd(0), 00120 signalEmitted(false), mode(m) 00121 { 00122 localServer = 0; 00123 //tcpServer = 0; 00124 } 00125 00126 SocketConnectionBackend::~SocketConnectionBackend() 00127 { 00128 if (mode == LocalSocketMode && localServer && 00129 localServer->localSocketType() == KLocalSocket::UnixSocket) 00130 QFile::remove(localServer->localPath()); 00131 } 00132 00133 void SocketConnectionBackend::setSuspended(bool enable) 00134 { 00135 if (state != Connected) 00136 return; 00137 Q_ASSERT(socket); 00138 Q_ASSERT(!localServer); // !tcpServer as well 00139 00140 if (enable) { 00141 //kDebug() << this << " suspending"; 00142 socket->setReadBufferSize(1); 00143 } else { 00144 //kDebug() << this << " resuming"; 00145 socket->setReadBufferSize(StandardBufferSize); 00146 if (socket->bytesAvailable() >= HeaderSize) { 00147 // there are bytes available 00148 QMetaObject::invokeMethod(this, "socketReadyRead", Qt::QueuedConnection); 00149 } 00150 00151 // We read all bytes here, but we don't use readAll() because we need 00152 // to read at least one byte (even if there isn't any) so that the 00153 // socket notifier is reenabled 00154 QByteArray data = socket->read(socket->bytesAvailable() + 1); 00155 for (int i = data.size(); --i >= 0; ) 00156 socket->ungetChar(data[i]); 00157 } 00158 } 00159 00160 bool SocketConnectionBackend::connectToRemote(const KUrl &url) 00161 { 00162 Q_ASSERT(state == Idle); 00163 Q_ASSERT(!socket); 00164 Q_ASSERT(!localServer); // !tcpServer as well 00165 00166 if (mode == LocalSocketMode) { 00167 KLocalSocket *sock = new KLocalSocket(this); 00168 QString path = url.path(); 00169 KLocalSocket::LocalSocketType type = KLocalSocket::UnixSocket; 00170 00171 if (url.queryItem(QLatin1String("abstract")) == QLatin1String("1")) 00172 type = KLocalSocket::AbstractUnixSocket; 00173 00174 sock->connectToPath(path); 00175 socket = sock; 00176 } else { 00177 socket = new QTcpSocket(this); 00178 socket->connectToHost(url.host(),url.port()); 00179 00180 if (!socket->waitForConnected(1000)) { 00181 state = Idle; 00182 kDebug() << "could not connect to " << url; 00183 return false; 00184 } 00185 } 00186 connect(socket, SIGNAL(readyRead()), SLOT(socketReadyRead())); 00187 connect(socket, SIGNAL(disconnected()), SLOT(socketDisconnected())); 00188 state = Connected; 00189 return true; 00190 } 00191 00192 void SocketConnectionBackend::socketDisconnected() 00193 { 00194 state = Idle; 00195 emit disconnected(); 00196 } 00197 00198 bool SocketConnectionBackend::listenForRemote() 00199 { 00200 Q_ASSERT(state == Idle); 00201 Q_ASSERT(!socket); 00202 Q_ASSERT(!localServer); // !tcpServer as well 00203 00204 if (mode == LocalSocketMode) { 00205 QString prefix = KStandardDirs::locateLocal("socket", KGlobal::mainComponent().componentName()); 00206 KTemporaryFile *socketfile = new KTemporaryFile(); 00207 socketfile->setPrefix(prefix); 00208 socketfile->setSuffix(QLatin1String(".slave-socket")); 00209 if (!socketfile->open()) 00210 { 00211 errorString = i18n("Unable to create io-slave: %1", strerror(errno)); 00212 delete socketfile; 00213 return false; 00214 } 00215 00216 QString sockname = socketfile->fileName(); 00217 KUrl addressUrl(sockname); 00218 addressUrl.setProtocol("local"); 00219 address = addressUrl.url(); 00220 delete socketfile; // can't bind if there is such a file 00221 00222 localServer = new KLocalSocketServer(this); 00223 if (!localServer->listen(sockname, KLocalSocket::UnixSocket)) { 00224 errorString = localServer->errorString(); 00225 delete localServer; 00226 return false; 00227 } 00228 00229 connect(localServer, SIGNAL(newConnection()), SIGNAL(newConnection())); 00230 } else { 00231 tcpServer = new QTcpServer(this); 00232 tcpServer->listen(QHostAddress::LocalHost); 00233 if (!tcpServer->isListening()) { 00234 errorString = tcpServer->errorString(); 00235 delete tcpServer; 00236 return false; 00237 } 00238 00239 address = "tcp://127.0.0.1:" + QString::number(tcpServer->serverPort()); 00240 connect(tcpServer, SIGNAL(newConnection()), SIGNAL(newConnection())); 00241 } 00242 00243 state = Listening; 00244 return true; 00245 } 00246 00247 bool SocketConnectionBackend::waitForIncomingTask(int ms) 00248 { 00249 Q_ASSERT(state == Connected); 00250 Q_ASSERT(socket); 00251 if (socket->state() != QAbstractSocket::ConnectedState) { 00252 state = Idle; 00253 return false; // socket has probably closed, what do we do? 00254 } 00255 00256 signalEmitted = false; 00257 if (socket->bytesAvailable()) 00258 socketReadyRead(); 00259 if (signalEmitted) 00260 return true; // there was enough data in the socket 00261 00262 // not enough data in the socket, so wait for more 00263 QTime timer; 00264 timer.start(); 00265 00266 while (socket->state() == QAbstractSocket::ConnectedState && !signalEmitted && 00267 (ms == -1 || timer.elapsed() < ms)) 00268 if (!socket->waitForReadyRead(ms == -1 ? -1 : ms - timer.elapsed())) 00269 break; 00270 00271 if (signalEmitted) 00272 return true; 00273 if (socket->state() != QAbstractSocket::ConnectedState) 00274 state = Idle; 00275 return false; 00276 } 00277 00278 bool SocketConnectionBackend::sendCommand(const Task &task) 00279 { 00280 Q_ASSERT(state == Connected); 00281 Q_ASSERT(socket); 00282 00283 static char buffer[HeaderSize + 2]; 00284 sprintf(buffer, "%6x_%2x_", task.data.size(), task.cmd); 00285 socket->write(buffer, HeaderSize); 00286 socket->write(task.data); 00287 00288 //kDebug() << this << " Sending command " << hex << task.cmd << " of " 00289 // << task.data.size() << " bytes (" << socket->bytesToWrite() 00290 // << " bytes left to write"; 00291 00292 // blocking mode: 00293 while (socket->bytesToWrite() > 0 && socket->state() == QAbstractSocket::ConnectedState) 00294 socket->waitForBytesWritten(-1); 00295 00296 return socket->state() == QAbstractSocket::ConnectedState; 00297 } 00298 00299 AbstractConnectionBackend *SocketConnectionBackend::nextPendingConnection() 00300 { 00301 Q_ASSERT(state == Listening); 00302 Q_ASSERT(localServer || tcpServer); 00303 Q_ASSERT(!socket); 00304 00305 //kDebug() << "Got a new connection"; 00306 00307 QTcpSocket *newSocket; 00308 if (mode == LocalSocketMode) 00309 newSocket = localServer->nextPendingConnection(); 00310 else 00311 newSocket = tcpServer->nextPendingConnection(); 00312 if (!newSocket) 00313 return 0; // there was no connection... 00314 00315 SocketConnectionBackend *result = new SocketConnectionBackend(Mode(mode)); 00316 result->state = Connected; 00317 result->socket = newSocket; 00318 newSocket->setParent(result); 00319 connect(newSocket, SIGNAL(readyRead()), result, SLOT(socketReadyRead())); 00320 connect(newSocket, SIGNAL(disconnected()), result, SLOT(socketDisconnected())); 00321 00322 return result; 00323 } 00324 00325 void SocketConnectionBackend::socketReadyRead() 00326 { 00327 bool shouldReadAnother; 00328 do { 00329 if (!socket) 00330 // might happen if the invokeMethods were delivered after we disconnected 00331 return; 00332 00333 // kDebug() << this << "Got " << socket->bytesAvailable() << " bytes"; 00334 if (len == -1) { 00335 // We have to read the header 00336 static char buffer[HeaderSize]; 00337 00338 if (socket->bytesAvailable() < HeaderSize) { 00339 return; // wait for more data 00340 } 00341 00342 socket->read(buffer, sizeof buffer); 00343 buffer[6] = 0; 00344 buffer[9] = 0; 00345 00346 char *p = buffer; 00347 while( *p == ' ' ) p++; 00348 len = strtol( p, 0L, 16 ); 00349 00350 p = buffer + 7; 00351 while( *p == ' ' ) p++; 00352 cmd = strtol( p, 0L, 16 ); 00353 00354 // kDebug() << this << " Beginning of command " << hex << cmd << " of size " 00355 // << len; 00356 } 00357 00358 QPointer<SocketConnectionBackend> that = this; 00359 00360 // kDebug() << this << "Want to read " << len << " bytes"; 00361 if (socket->bytesAvailable() >= len) { 00362 Task task; 00363 task.cmd = cmd; 00364 if (len) 00365 task.data = socket->read(len); 00366 len = -1; 00367 00368 signalEmitted = true; 00369 emit commandReceived(task); 00370 } else if (len > StandardBufferSize) { 00371 kDebug(7017) << this << "Jumbo packet of" << len << "bytes"; 00372 socket->setReadBufferSize(len + 1); 00373 } 00374 00375 // If we're dead, better don't try anything. 00376 if (that.isNull()) 00377 return; 00378 00379 // Do we have enough for an another read? 00380 if (len == -1) 00381 shouldReadAnother = socket->bytesAvailable() >= HeaderSize; 00382 else 00383 shouldReadAnother = socket->bytesAvailable() >= len; 00384 } 00385 while (shouldReadAnother); 00386 } 00387 00388 Connection::Connection(QObject *parent) 00389 : QObject(parent), d(new ConnectionPrivate) 00390 { 00391 d->q = this; 00392 } 00393 00394 Connection::~Connection() 00395 { 00396 close(); 00397 delete d; 00398 } 00399 00400 void Connection::suspend() 00401 { 00402 //kDebug() << this << "Suspended"; 00403 d->suspended = true; 00404 if (d->backend) 00405 d->backend->setSuspended(true); 00406 } 00407 00408 void Connection::resume() 00409 { 00410 // send any outgoing or incoming commands that may be in queue 00411 QMetaObject::invokeMethod(this, "dequeue", Qt::QueuedConnection); 00412 00413 //kDebug() << this << "Resumed"; 00414 d->suspended = false; 00415 if (d->backend) 00416 d->backend->setSuspended(false); 00417 } 00418 00419 void Connection::close() 00420 { 00421 if (d->backend) { 00422 d->backend->disconnect(this); 00423 d->backend->deleteLater(); 00424 d->backend = 0; 00425 } 00426 d->outgoingTasks.clear(); 00427 d->incomingTasks.clear(); 00428 } 00429 00430 bool Connection::isConnected() const 00431 { 00432 return d->backend && d->backend->state == AbstractConnectionBackend::Connected; 00433 } 00434 00435 bool Connection::inited() const 00436 { 00437 return d->backend; 00438 } 00439 00440 bool Connection::suspended() const 00441 { 00442 return d->suspended; 00443 } 00444 00445 void Connection::connectToRemote(const QString &address) 00446 { 00447 //kDebug(7017) << "Connection requested to " << address; 00448 KUrl url = address; 00449 QString scheme = url.protocol(); 00450 00451 if (scheme == QLatin1String("local")) { 00452 d->setBackend(new SocketConnectionBackend(SocketConnectionBackend::LocalSocketMode, this)); 00453 } else if (scheme == QLatin1String("tcp")) { 00454 d->setBackend(new SocketConnectionBackend(SocketConnectionBackend::TcpSocketMode, this)); 00455 } else { 00456 kWarning(7017) << "Unknown requested KIO::Connection protocol='" << scheme 00457 << "' (" << address << ")"; 00458 Q_ASSERT(0); 00459 return; 00460 } 00461 00462 // connection succeeded 00463 if (!d->backend->connectToRemote(url)) { 00464 //kWarning(7017) << "could not connect to " << url << "using scheme" << scheme ; 00465 delete d->backend; 00466 d->backend = 0; 00467 return; 00468 } 00469 00470 d->dequeue(); 00471 } 00472 00473 QString Connection::errorString() const 00474 { 00475 if (d->backend) 00476 return d->backend->errorString; 00477 return QString(); 00478 } 00479 00480 bool Connection::send(int cmd, const QByteArray& data) 00481 { 00482 if (!inited() || !d->outgoingTasks.isEmpty()) { 00483 Task task; 00484 task.cmd = cmd; 00485 task.data = data; 00486 d->outgoingTasks.enqueue(task); 00487 return true; 00488 } else { 00489 return sendnow(cmd, data); 00490 } 00491 } 00492 00493 bool Connection::sendnow(int _cmd, const QByteArray &data) 00494 { 00495 if (data.size() > 0xffffff) 00496 return false; 00497 00498 if (!isConnected()) 00499 return false; 00500 00501 //kDebug() << this << "Sending command " << _cmd << " of size " << data.size(); 00502 Task task; 00503 task.cmd = _cmd; 00504 task.data = data; 00505 return d->backend->sendCommand(task); 00506 } 00507 00508 bool Connection::hasTaskAvailable() const 00509 { 00510 return !d->incomingTasks.isEmpty(); 00511 } 00512 00513 bool Connection::waitForIncomingTask(int ms) 00514 { 00515 if (!isConnected()) 00516 return false; 00517 00518 if (d->backend) 00519 return d->backend->waitForIncomingTask(ms); 00520 return false; 00521 } 00522 00523 int Connection::read( int* _cmd, QByteArray &data ) 00524 { 00525 // if it's still empty, then it's an error 00526 if (d->incomingTasks.isEmpty()) { 00527 //kWarning() << this << "Task list is empty!"; 00528 return -1; 00529 } 00530 const Task task = d->incomingTasks.dequeue(); 00531 //kDebug() << this << "Command " << task.cmd << " removed from the queue (size " 00532 // << task.data.size() << ")"; 00533 *_cmd = task.cmd; 00534 data = task.data; 00535 00536 // if we didn't empty our reading queue, emit again 00537 if (!d->suspended && !d->incomingTasks.isEmpty()) 00538 QMetaObject::invokeMethod(this, "dequeue", Qt::QueuedConnection); 00539 00540 return data.size(); 00541 } 00542 00543 ConnectionServer::ConnectionServer(QObject *parent) 00544 : QObject(parent), d(new ConnectionServerPrivate) 00545 { 00546 d->q = this; 00547 } 00548 00549 ConnectionServer::~ConnectionServer() 00550 { 00551 delete d; 00552 } 00553 00554 void ConnectionServer::listenForRemote() 00555 { 00556 #ifdef Q_WS_WIN 00557 d->backend = new SocketConnectionBackend(SocketConnectionBackend::TcpSocketMode, this); 00558 #else 00559 d->backend = new SocketConnectionBackend(SocketConnectionBackend::LocalSocketMode, this); 00560 #endif 00561 if (!d->backend->listenForRemote()) { 00562 delete d->backend; 00563 d->backend = 0; 00564 return; 00565 } 00566 00567 connect(d->backend, SIGNAL(newConnection()), SIGNAL(newConnection())); 00568 kDebug(7017) << "Listening on " << d->backend->address; 00569 } 00570 00571 QString ConnectionServer::address() const 00572 { 00573 if (d->backend) 00574 return d->backend->address; 00575 return QString(); 00576 } 00577 00578 bool ConnectionServer::isListening() const 00579 { 00580 return d->backend && d->backend->state == AbstractConnectionBackend::Listening; 00581 } 00582 00583 void ConnectionServer::close() 00584 { 00585 delete d->backend; 00586 d->backend = 0; 00587 } 00588 00589 Connection *ConnectionServer::nextPendingConnection() 00590 { 00591 if (!isListening()) 00592 return 0; 00593 00594 AbstractConnectionBackend *newBackend = d->backend->nextPendingConnection(); 00595 if (!newBackend) 00596 return 0; // no new backend... 00597 00598 Connection *result = new Connection; 00599 result->d->setBackend(newBackend); 00600 newBackend->setParent(result); 00601 00602 return result; 00603 } 00604 00605 void ConnectionServer::setNextPendingConnection(Connection *conn) 00606 { 00607 AbstractConnectionBackend *newBackend = d->backend->nextPendingConnection(); 00608 Q_ASSERT(newBackend); 00609 00610 conn->d->backend = newBackend; 00611 conn->d->setBackend(newBackend); 00612 newBackend->setParent(conn); 00613 00614 conn->d->dequeue(); 00615 } 00616 00617 #include "connection_p.moc" 00618 #include "connection.moc"
KDE 4.6 API Reference