KIO
connection.cpp
Go to the documentation of this file.
00001 /* This file is part of the KDE libraries 00002 Copyright (C) 2000 Stephan Kulow <coolo@kde.org> 00003 David Faure <faure@kde.org> 00004 Copyright (C) 2007 Thiago Macieira <thiago@kde.org> 00005 00006 This library is free software; you can redistribute it and/or 00007 modify it under the terms of the GNU Library General Public 00008 License as published by the Free Software Foundation; either 00009 version 2 of the License, or (at your option) any later version. 00010 00011 This library is distributed in the hope that it will be useful, 00012 but WITHOUT ANY WARRANTY; without even the implied warranty of 00013 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 00014 Library General Public License for more details. 00015 00016 You should have received a copy of the GNU Library General Public License 00017 along with this library; see the file COPYING.LIB. If not, write to 00018 the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, 00019 Boston, MA 02110-1301, USA. 00020 */ 00021 00022 #include "connection.h" 00023 #include "connection_p.h" 00024 00025 #include <errno.h> 00026 00027 #include <QQueue> 00028 #include <QPointer> 00029 #include <QTime> 00030 00031 #include <kdebug.h> 00032 #include <kcomponentdata.h> 00033 #include <kglobal.h> 00034 #include <klocale.h> 00035 #include <kstandarddirs.h> 00036 #include <ktemporaryfile.h> 00037 #include <kurl.h> 00038 00039 using namespace KIO; 00040 00041 class KIO::ConnectionPrivate 00042 { 00043 public: 00044 inline ConnectionPrivate() 00045 : backend(0), suspended(false) 00046 { } 00047 00048 void dequeue(); 00049 void commandReceived(const Task &task); 00050 void disconnected(); 00051 void setBackend(AbstractConnectionBackend *b); 00052 00053 QQueue<Task> outgoingTasks; 00054 QQueue<Task> incomingTasks; 00055 AbstractConnectionBackend *backend; 00056 Connection *q; 00057 bool suspended; 00058 }; 00059 00060 class KIO::ConnectionServerPrivate 00061 { 00062 public: 00063 inline ConnectionServerPrivate() 00064 : backend(0) 00065 { } 00066 00067 ConnectionServer *q; 00068 AbstractConnectionBackend *backend; 00069 }; 00070 00071 void ConnectionPrivate::dequeue() 00072 { 00073 if (!backend || suspended) 00074 return; 00075 00076 while (!outgoingTasks.isEmpty()) { 00077 const Task task = outgoingTasks.dequeue(); 00078 q->sendnow(task.cmd, task.data); 00079 } 00080 00081 if (!incomingTasks.isEmpty()) 00082 emit q->readyRead(); 00083 } 00084 00085 void ConnectionPrivate::commandReceived(const Task &task) 00086 { 00087 //kDebug() << this << "Command " << task.cmd << " added to the queue"; 00088 if (!suspended && incomingTasks.isEmpty()) 00089 QMetaObject::invokeMethod(q, "dequeue", Qt::QueuedConnection); 00090 incomingTasks.enqueue(task); 00091 } 00092 00093 void ConnectionPrivate::disconnected() 00094 { 00095 q->close(); 00096 QMetaObject::invokeMethod(q, "readyRead", Qt::QueuedConnection); 00097 } 00098 00099 void ConnectionPrivate::setBackend(AbstractConnectionBackend *b) 00100 { 00101 backend = b; 00102 if (backend) { 00103 q->connect(backend, SIGNAL(commandReceived(Task)), SLOT(commandReceived(Task))); 00104 q->connect(backend, SIGNAL(disconnected()), SLOT(disconnected())); 00105 backend->setSuspended(suspended); 00106 } 00107 } 00108 00109 AbstractConnectionBackend::AbstractConnectionBackend(QObject *parent) 00110 : QObject(parent), state(Idle) 00111 { 00112 } 00113 00114 AbstractConnectionBackend::~AbstractConnectionBackend() 00115 { 00116 } 00117 00118 SocketConnectionBackend::SocketConnectionBackend(Mode m, QObject *parent) 00119 : AbstractConnectionBackend(parent), socket(0), len(-1), cmd(0), 00120 signalEmitted(false), mode(m) 00121 { 00122 localServer = 0; 00123 //tcpServer = 0; 00124 } 00125 00126 SocketConnectionBackend::~SocketConnectionBackend() 00127 { 00128 if (mode == LocalSocketMode && localServer && 00129 localServer->localSocketType() == KLocalSocket::UnixSocket) 00130 QFile::remove(localServer->localPath()); 00131 } 00132 00133 void SocketConnectionBackend::setSuspended(bool enable) 00134 { 00135 if (state != Connected) 00136 return; 00137 Q_ASSERT(socket); 00138 Q_ASSERT(!localServer); // !tcpServer as well 00139 00140 if (enable) { 00141 //kDebug() << this << " suspending"; 00142 socket->setReadBufferSize(1); 00143 } else { 00144 //kDebug() << this << " resuming"; 00145 socket->setReadBufferSize(StandardBufferSize); 00146 if (socket->bytesAvailable() >= HeaderSize) { 00147 // there are bytes available 00148 QMetaObject::invokeMethod(this, "socketReadyRead", Qt::QueuedConnection); 00149 } 00150 00151 // We read all bytes here, but we don't use readAll() because we need 00152 // to read at least one byte (even if there isn't any) so that the 00153 // socket notifier is reenabled 00154 QByteArray data = socket->read(socket->bytesAvailable() + 1); 00155 for (int i = data.size(); --i >= 0; ) 00156 socket->ungetChar(data[i]); 00157 } 00158 } 00159 00160 bool SocketConnectionBackend::connectToRemote(const KUrl &url) 00161 { 00162 Q_ASSERT(state == Idle); 00163 Q_ASSERT(!socket); 00164 Q_ASSERT(!localServer); // !tcpServer as well 00165 00166 if (mode == LocalSocketMode) { 00167 KLocalSocket *sock = new KLocalSocket(this); 00168 QString path = url.path(); 00169 #if 0 00170 // TODO: Activate once abstract socket support is implemented in Qt. 00171 KLocalSocket::LocalSocketType type = KLocalSocket::UnixSocket; 00172 00173 if (url.queryItem(QLatin1String("abstract")) == QLatin1String("1")) 00174 type = KLocalSocket::AbstractUnixSocket; 00175 #endif 00176 sock->connectToPath(path); 00177 socket = sock; 00178 } else { 00179 socket = new QTcpSocket(this); 00180 socket->connectToHost(url.host(),url.port()); 00181 00182 if (!socket->waitForConnected(1000)) { 00183 state = Idle; 00184 kDebug() << "could not connect to " << url; 00185 return false; 00186 } 00187 } 00188 connect(socket, SIGNAL(readyRead()), SLOT(socketReadyRead())); 00189 connect(socket, SIGNAL(disconnected()), SLOT(socketDisconnected())); 00190 state = Connected; 00191 return true; 00192 } 00193 00194 void SocketConnectionBackend::socketDisconnected() 00195 { 00196 state = Idle; 00197 emit disconnected(); 00198 } 00199 00200 bool SocketConnectionBackend::listenForRemote() 00201 { 00202 Q_ASSERT(state == Idle); 00203 Q_ASSERT(!socket); 00204 Q_ASSERT(!localServer); // !tcpServer as well 00205 00206 if (mode == LocalSocketMode) { 00207 QString prefix = KStandardDirs::locateLocal("socket", KGlobal::mainComponent().componentName()); 00208 KTemporaryFile *socketfile = new KTemporaryFile(); 00209 socketfile->setPrefix(prefix); 00210 socketfile->setSuffix(QLatin1String(".slave-socket")); 00211 if (!socketfile->open()) 00212 { 00213 errorString = i18n("Unable to create io-slave: %1", strerror(errno)); 00214 delete socketfile; 00215 return false; 00216 } 00217 00218 QString sockname = socketfile->fileName(); 00219 KUrl addressUrl(sockname); 00220 addressUrl.setProtocol("local"); 00221 address = addressUrl.url(); 00222 delete socketfile; // can't bind if there is such a file 00223 00224 localServer = new KLocalSocketServer(this); 00225 if (!localServer->listen(sockname, KLocalSocket::UnixSocket)) { 00226 errorString = localServer->errorString(); 00227 delete localServer; 00228 localServer = 0; 00229 return false; 00230 } 00231 00232 connect(localServer, SIGNAL(newConnection()), SIGNAL(newConnection())); 00233 } else { 00234 tcpServer = new QTcpServer(this); 00235 tcpServer->listen(QHostAddress::LocalHost); 00236 if (!tcpServer->isListening()) { 00237 errorString = tcpServer->errorString(); 00238 delete tcpServer; 00239 tcpServer = 0; 00240 return false; 00241 } 00242 00243 address = "tcp://127.0.0.1:" + QString::number(tcpServer->serverPort()); 00244 connect(tcpServer, SIGNAL(newConnection()), SIGNAL(newConnection())); 00245 } 00246 00247 state = Listening; 00248 return true; 00249 } 00250 00251 bool SocketConnectionBackend::waitForIncomingTask(int ms) 00252 { 00253 Q_ASSERT(state == Connected); 00254 Q_ASSERT(socket); 00255 if (socket->state() != QAbstractSocket::ConnectedState) { 00256 state = Idle; 00257 return false; // socket has probably closed, what do we do? 00258 } 00259 00260 signalEmitted = false; 00261 if (socket->bytesAvailable()) 00262 socketReadyRead(); 00263 if (signalEmitted) 00264 return true; // there was enough data in the socket 00265 00266 // not enough data in the socket, so wait for more 00267 QTime timer; 00268 timer.start(); 00269 00270 while (socket->state() == QAbstractSocket::ConnectedState && !signalEmitted && 00271 (ms == -1 || timer.elapsed() < ms)) 00272 if (!socket->waitForReadyRead(ms == -1 ? -1 : ms - timer.elapsed())) 00273 break; 00274 00275 if (signalEmitted) 00276 return true; 00277 if (socket->state() != QAbstractSocket::ConnectedState) 00278 state = Idle; 00279 return false; 00280 } 00281 00282 bool SocketConnectionBackend::sendCommand(const Task &task) 00283 { 00284 Q_ASSERT(state == Connected); 00285 Q_ASSERT(socket); 00286 00287 static char buffer[HeaderSize + 2]; 00288 sprintf(buffer, "%6x_%2x_", task.data.size(), task.cmd); 00289 socket->write(buffer, HeaderSize); 00290 socket->write(task.data); 00291 00292 //kDebug() << this << " Sending command " << hex << task.cmd << " of " 00293 // << task.data.size() << " bytes (" << socket->bytesToWrite() 00294 // << " bytes left to write"; 00295 00296 // blocking mode: 00297 while (socket->bytesToWrite() > 0 && socket->state() == QAbstractSocket::ConnectedState) 00298 socket->waitForBytesWritten(-1); 00299 00300 return socket->state() == QAbstractSocket::ConnectedState; 00301 } 00302 00303 AbstractConnectionBackend *SocketConnectionBackend::nextPendingConnection() 00304 { 00305 Q_ASSERT(state == Listening); 00306 Q_ASSERT(localServer || tcpServer); 00307 Q_ASSERT(!socket); 00308 00309 //kDebug() << "Got a new connection"; 00310 00311 QTcpSocket *newSocket; 00312 if (mode == LocalSocketMode) 00313 newSocket = localServer->nextPendingConnection(); 00314 else 00315 newSocket = tcpServer->nextPendingConnection(); 00316 if (!newSocket) 00317 return 0; // there was no connection... 00318 00319 SocketConnectionBackend *result = new SocketConnectionBackend(Mode(mode)); 00320 result->state = Connected; 00321 result->socket = newSocket; 00322 newSocket->setParent(result); 00323 connect(newSocket, SIGNAL(readyRead()), result, SLOT(socketReadyRead())); 00324 connect(newSocket, SIGNAL(disconnected()), result, SLOT(socketDisconnected())); 00325 00326 return result; 00327 } 00328 00329 void SocketConnectionBackend::socketReadyRead() 00330 { 00331 bool shouldReadAnother; 00332 do { 00333 if (!socket) 00334 // might happen if the invokeMethods were delivered after we disconnected 00335 return; 00336 00337 // kDebug() << this << "Got " << socket->bytesAvailable() << " bytes"; 00338 if (len == -1) { 00339 // We have to read the header 00340 static char buffer[HeaderSize]; 00341 00342 if (socket->bytesAvailable() < HeaderSize) { 00343 return; // wait for more data 00344 } 00345 00346 socket->read(buffer, sizeof buffer); 00347 buffer[6] = 0; 00348 buffer[9] = 0; 00349 00350 char *p = buffer; 00351 while( *p == ' ' ) p++; 00352 len = strtol( p, 0L, 16 ); 00353 00354 p = buffer + 7; 00355 while( *p == ' ' ) p++; 00356 cmd = strtol( p, 0L, 16 ); 00357 00358 // kDebug() << this << " Beginning of command " << hex << cmd << " of size " 00359 // << len; 00360 } 00361 00362 QPointer<SocketConnectionBackend> that = this; 00363 00364 // kDebug() << this << "Want to read " << len << " bytes"; 00365 if (socket->bytesAvailable() >= len) { 00366 Task task; 00367 task.cmd = cmd; 00368 if (len) 00369 task.data = socket->read(len); 00370 len = -1; 00371 00372 signalEmitted = true; 00373 emit commandReceived(task); 00374 } else if (len > StandardBufferSize) { 00375 kDebug(7017) << this << "Jumbo packet of" << len << "bytes"; 00376 socket->setReadBufferSize(len + 1); 00377 } 00378 00379 // If we're dead, better don't try anything. 00380 if (that.isNull()) 00381 return; 00382 00383 // Do we have enough for an another read? 00384 if (len == -1) 00385 shouldReadAnother = socket->bytesAvailable() >= HeaderSize; 00386 else 00387 shouldReadAnother = socket->bytesAvailable() >= len; 00388 } 00389 while (shouldReadAnother); 00390 } 00391 00392 Connection::Connection(QObject *parent) 00393 : QObject(parent), d(new ConnectionPrivate) 00394 { 00395 d->q = this; 00396 } 00397 00398 Connection::~Connection() 00399 { 00400 close(); 00401 delete d; 00402 } 00403 00404 void Connection::suspend() 00405 { 00406 //kDebug() << this << "Suspended"; 00407 d->suspended = true; 00408 if (d->backend) 00409 d->backend->setSuspended(true); 00410 } 00411 00412 void Connection::resume() 00413 { 00414 // send any outgoing or incoming commands that may be in queue 00415 QMetaObject::invokeMethod(this, "dequeue", Qt::QueuedConnection); 00416 00417 //kDebug() << this << "Resumed"; 00418 d->suspended = false; 00419 if (d->backend) 00420 d->backend->setSuspended(false); 00421 } 00422 00423 void Connection::close() 00424 { 00425 if (d->backend) { 00426 d->backend->disconnect(this); 00427 d->backend->deleteLater(); 00428 d->backend = 0; 00429 } 00430 d->outgoingTasks.clear(); 00431 d->incomingTasks.clear(); 00432 } 00433 00434 bool Connection::isConnected() const 00435 { 00436 return d->backend && d->backend->state == AbstractConnectionBackend::Connected; 00437 } 00438 00439 bool Connection::inited() const 00440 { 00441 return d->backend; 00442 } 00443 00444 bool Connection::suspended() const 00445 { 00446 return d->suspended; 00447 } 00448 00449 void Connection::connectToRemote(const QString &address) 00450 { 00451 //kDebug(7017) << "Connection requested to " << address; 00452 KUrl url = address; 00453 QString scheme = url.protocol(); 00454 00455 if (scheme == QLatin1String("local")) { 00456 d->setBackend(new SocketConnectionBackend(SocketConnectionBackend::LocalSocketMode, this)); 00457 } else if (scheme == QLatin1String("tcp")) { 00458 d->setBackend(new SocketConnectionBackend(SocketConnectionBackend::TcpSocketMode, this)); 00459 } else { 00460 kWarning(7017) << "Unknown requested KIO::Connection protocol='" << scheme 00461 << "' (" << address << ")"; 00462 Q_ASSERT(0); 00463 return; 00464 } 00465 00466 // connection succeeded 00467 if (!d->backend->connectToRemote(url)) { 00468 //kWarning(7017) << "could not connect to " << url << "using scheme" << scheme ; 00469 delete d->backend; 00470 d->backend = 0; 00471 return; 00472 } 00473 00474 d->dequeue(); 00475 } 00476 00477 QString Connection::errorString() const 00478 { 00479 if (d->backend) 00480 return d->backend->errorString; 00481 return QString(); 00482 } 00483 00484 bool Connection::send(int cmd, const QByteArray& data) 00485 { 00486 if (!inited() || !d->outgoingTasks.isEmpty()) { 00487 Task task; 00488 task.cmd = cmd; 00489 task.data = data; 00490 d->outgoingTasks.enqueue(task); 00491 return true; 00492 } else { 00493 return sendnow(cmd, data); 00494 } 00495 } 00496 00497 bool Connection::sendnow(int _cmd, const QByteArray &data) 00498 { 00499 if (data.size() > 0xffffff) 00500 return false; 00501 00502 if (!isConnected()) 00503 return false; 00504 00505 //kDebug() << this << "Sending command " << _cmd << " of size " << data.size(); 00506 Task task; 00507 task.cmd = _cmd; 00508 task.data = data; 00509 return d->backend->sendCommand(task); 00510 } 00511 00512 bool Connection::hasTaskAvailable() const 00513 { 00514 return !d->incomingTasks.isEmpty(); 00515 } 00516 00517 bool Connection::waitForIncomingTask(int ms) 00518 { 00519 if (!isConnected()) 00520 return false; 00521 00522 if (d->backend) 00523 return d->backend->waitForIncomingTask(ms); 00524 return false; 00525 } 00526 00527 int Connection::read( int* _cmd, QByteArray &data ) 00528 { 00529 // if it's still empty, then it's an error 00530 if (d->incomingTasks.isEmpty()) { 00531 //kWarning() << this << "Task list is empty!"; 00532 return -1; 00533 } 00534 const Task task = d->incomingTasks.dequeue(); 00535 //kDebug() << this << "Command " << task.cmd << " removed from the queue (size " 00536 // << task.data.size() << ")"; 00537 *_cmd = task.cmd; 00538 data = task.data; 00539 00540 // if we didn't empty our reading queue, emit again 00541 if (!d->suspended && !d->incomingTasks.isEmpty()) 00542 QMetaObject::invokeMethod(this, "dequeue", Qt::QueuedConnection); 00543 00544 return data.size(); 00545 } 00546 00547 ConnectionServer::ConnectionServer(QObject *parent) 00548 : QObject(parent), d(new ConnectionServerPrivate) 00549 { 00550 d->q = this; 00551 } 00552 00553 ConnectionServer::~ConnectionServer() 00554 { 00555 delete d; 00556 } 00557 00558 void ConnectionServer::listenForRemote() 00559 { 00560 #ifdef Q_WS_WIN 00561 d->backend = new SocketConnectionBackend(SocketConnectionBackend::TcpSocketMode, this); 00562 #else 00563 d->backend = new SocketConnectionBackend(SocketConnectionBackend::LocalSocketMode, this); 00564 #endif 00565 if (!d->backend->listenForRemote()) { 00566 delete d->backend; 00567 d->backend = 0; 00568 return; 00569 } 00570 00571 connect(d->backend, SIGNAL(newConnection()), SIGNAL(newConnection())); 00572 kDebug(7017) << "Listening on " << d->backend->address; 00573 } 00574 00575 QString ConnectionServer::address() const 00576 { 00577 if (d->backend) 00578 return d->backend->address; 00579 return QString(); 00580 } 00581 00582 bool ConnectionServer::isListening() const 00583 { 00584 return d->backend && d->backend->state == AbstractConnectionBackend::Listening; 00585 } 00586 00587 void ConnectionServer::close() 00588 { 00589 delete d->backend; 00590 d->backend = 0; 00591 } 00592 00593 Connection *ConnectionServer::nextPendingConnection() 00594 { 00595 if (!isListening()) 00596 return 0; 00597 00598 AbstractConnectionBackend *newBackend = d->backend->nextPendingConnection(); 00599 if (!newBackend) 00600 return 0; // no new backend... 00601 00602 Connection *result = new Connection; 00603 result->d->setBackend(newBackend); 00604 newBackend->setParent(result); 00605 00606 return result; 00607 } 00608 00609 void ConnectionServer::setNextPendingConnection(Connection *conn) 00610 { 00611 AbstractConnectionBackend *newBackend = d->backend->nextPendingConnection(); 00612 Q_ASSERT(newBackend); 00613 00614 conn->d->backend = newBackend; 00615 conn->d->setBackend(newBackend); 00616 newBackend->setParent(conn); 00617 00618 conn->d->dequeue(); 00619 } 00620 00621 #include "connection_p.moc" 00622 #include "connection.moc"
KDE 4.7 API Reference