kdecore Library API Documentation

kresolvermanager.cpp

00001 /*  -*- C++ -*-
00002  *  Copyright (C) 2003 Thiago Macieira <thiago.macieira@kdemail.net>
00003  *
00004  *
00005  *  Permission is hereby granted, free of charge, to any person obtaining
00006  *  a copy of this software and associated documentation files (the
00007  *  "Software"), to deal in the Software without restriction, including
00008  *  without limitation the rights to use, copy, modify, merge, publish,
00009  *  distribute, sublicense, and/or sell copies of the Software, and to
00010  *  permit persons to whom the Software is furnished to do so, subject to
00011  *  the following conditions:
00012  *
00013  *  The above copyright notice and this permission notice shall be included 
00014  *  in all copies or substantial portions of the Software.
00015  *
00016  *  THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
00017  *  EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
00018  *  MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
00019  *  NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
00020  *  LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
00021  *  OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
00022  *  WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
00023  */
00024 
00025 #include "config.h"
00026 
00027 #include <sys/types.h>
00028 #include <netinet/in.h>
00029 #include <limits.h>
00030 #include <unistd.h>     // only needed for pid_t
00031 
00032 #ifdef HAVE_RES_INIT
00033 # include <sys/stat.h>
00034 # include <resolv.h>
00035 #endif
00036 
00037 #include <qapplication.h>
00038 #include <qstring.h>
00039 #include <qcstring.h>
00040 #include <qptrlist.h>
00041 #include <qtimer.h>
00042 #include <qmutex.h>
00043 #include <qthread.h>
00044 #include <qwaitcondition.h>
00045 #include <qsemaphore.h>
00046 
00047 #include "kresolver.h"
00048 #include "kresolver_p.h"
00049 #include "kresolverworkerbase.h"
00050 #include "kresolverstandardworkers_p.h"
00051 
00052 using namespace KNetwork;
00053 using namespace KNetwork::Internal;
00054 
00055 /*
00056  * Explanation on how the resolver system works
00057 
00058    When KResolver::start is called, it calls KResolverManager::enqueue to add
00059    an entry to the queue. KResolverManager::enqueue will verify the availability
00060    of a worker thread: if one is available, it will dispatch the request to it.
00061    If no threads are available, it will then decide whether to launch a thread
00062    or to queue for the future.
00063 
00064    (This process is achieved by always queueing the new request, starting a
00065    new thread if necessary and then notifying of the availability of data
00066    to all worker threads).
00067 
00068  * Worker thread
00069    A new thread, when started, will enter its event loop
00070    immediately. That is, it'll first try to acquire new data to
00071    process, which means it will lock and unlock the manager mutex in
00072    the process.
00073 
00074    If it finds no new data, it'll wait on the feedWorkers condition
00075    for a certain maximum time. If that time expires and there's still
00076    no data, the thread will exit, in order to save system resources.
00077 
00078    If it finds data, however, it'll set up and call the worker class
00079    that has been selected by the manager. Once that worker is done,
00080    the thread releases the data through KResolverManager::releaseData.
00081 
00082  * Data requesting/releasing
00083    A worker thread always calls upon functions on the resolver manager
00084    in order to acquire and release data.
00085 
00086    When data is being requested, the KResolverManager::requestData
00087    function will look the currentRequests list and return the first
00088    Queued request it finds, while marking it to be InProgress.
00089 
00090    When the worker class has returned, the worker thread will release
00091    that data through the KResolverManager::releaseData function. If the
00092    worker class has requested no further data (nRequests == 0), the
00093    request's status is marked to be Done. It'll then look at the
00094    requestor for that data: if it was requested by another worker,
00095    it'll decrement the requests count for that one and add the results
00096    to a list. And, finally, if the requests count for the requestor
00097    becomes 0, it'll repeat this process for the requestor as well
00098    (change status to Done, check for a requestor).
00099  */
00100 
00101 namespace
00102 {
00103 
00104 /*
00105  * This class is used to control the access to the
00106  * system's resolver API.
00107  *
00108  * It is necessary to periodically poll /etc/resolv.conf and reload
00109  * it if any changes are noticed. This class does exactly that.
00110  *
00111  * However, there's also the problem of reloading the structure while
00112  * some threads are in progress. Therefore, we keep a usage reference count.
00113  */
00114 class ResInitUsage
00115 {
00116 #ifdef HAVE_RES_INIT
00117   time_t mTime;
00118   QWaitCondition cond;
00119   QMutex mutex;
00120   int useCount;
00121 
00122   bool shouldResInit()
00123   {
00124     // check that /etc/resolv.conf has changed 
00125     struct stat st;
00126     if (stat("/etc/resolv.conf", &st) != 0)
00127       return false;
00128     
00129     if (mTime < st.st_mtime)
00130       {
00131     //qDebug("ResInitUsage: /etc/resolv.conf updated");
00132     return true;
00133       }
00134     return false;
00135   }
00136 
00137   void reResInit()
00138   {
00139     //qDebug("ResInitUsage: calling res_init()");
00140     res_init();
00141     
00142     struct stat st;
00143     if (stat("/etc/resolv.conf", &st) == 0)
00144       mTime = st.st_mtime;
00145   }
00146 
00147 public:
00148   ResInitUsage()
00149     : mTime(0), useCount(0)
00150   { }
00151 
00152   /*
00153    * Marks the end of usage to the resolver tools
00154    */
00155   void operator--(int)
00156   {
00157     mutex.lock();
00158     if (--useCount == 0)
00159       // we've reached 0, wake up anyone that's waiting to call res_init
00160       cond.wakeAll(); 
00161     mutex.unlock();
00162   }
00163 
00164   /*
00165    * Marks the beginning of usage of the resolver API
00166    */
00167   void operator++(int)
00168   {
00169     mutex.lock();
00170 
00171     if (shouldResInit())
00172       {
00173     if (useCount)
00174       {
00175         // other threads are already using the API, so wait till
00176         // it's all clear
00177         //qDebug("ResInitUsage: waiting for libresolv to be clear");
00178         cond.wait(&mutex);
00179       }
00180     reResInit();
00181       }
00182     useCount++;
00183     mutex.unlock();
00184   }
00185 
00186 #else
00187 public:
00188   ResInitUsage()
00189   { }
00190 
00191   void operator--(int)
00192   { }
00193 
00194   void operator++(int)
00195   { }
00196 #endif
00197 
00198 } resInit;
00199 
00200 /*
00201  * parameters
00202  */
00203 // a thread will try maxThreadRetries to get data, waiting at most
00204 // maxThreadWaitTime milliseconds between each attempt. After that, it'll
00205 // exit
00206 static const int maxThreadWaitTime = 20000; // 20 seconds
00207 static const int maxThreads = 5;
00208 
00209 static pid_t pid;       // FIXME -- disable when everything is ok
00210 
00211 KResolverThread::KResolverThread()
00212   : data(0L)
00213 {
00214 }
00215 
00216 // remember! This function runs in a separate thread!
00217 void KResolverThread::run()
00218 {
00219   // initialisation
00220   // enter the loop already
00221 
00222   //qDebug("KResolverThread(thread %u/%p): started", pid, (void*)QThread::currentThread());
00223   KResolverManager::manager()->registerThread(this);
00224   while (true)
00225     {
00226       data = KResolverManager::manager()->requestData(this, ::maxThreadWaitTime);
00227       //qDebug("KResolverThread(thread %u/%p) got data %p", KResolverManager::pid, 
00228       //       (void*)QThread::currentThread(), (void*)data);
00229       if (data)
00230     {
00231       // yes, we got data
00232       // process it!
00233       
00234       // 1) set up
00235       ;
00236       
00237       // 2) run it
00238       data->worker->run();
00239       
00240       // 3) release data
00241       KResolverManager::manager()->releaseData(this, data);
00242       
00243       // now go back to the loop
00244     }
00245       else
00246     break;
00247     }
00248 
00249   KResolverManager::manager()->unregisterThread(this);
00250   //qDebug("KResolverThread(thread %u/%p): exiting", pid, (void*)QThread::currentThread());
00251 }
00252 
00253 static KResolverManager *globalManager;
00254 
00255 KResolverManager* KResolverManager::manager()
00256 {
00257   if (globalManager == 0L)
00258     new KResolverManager();
00259   return globalManager;
00260 }
00261 
00262 KResolverManager::KResolverManager()
00263   : runningThreads(0), availableThreads(0)
00264 {
00265   globalManager = this;
00266   workers.setAutoDelete(true);
00267   currentRequests.setAutoDelete(true);
00268   initStandardWorkers();
00269 
00270   pid = getpid();
00271 }
00272 
00273 KResolverManager::~KResolverManager()
00274 {
00275   // this should never be called
00276 
00277   // kill off running threads
00278   for (workers.first(); workers.current(); workers.next())
00279     workers.current()->terminate();
00280 }
00281 
00282 void KResolverManager::registerThread(KResolverThread* )
00283 {
00284 }
00285 
00286 void KResolverManager::unregisterThread(KResolverThread*)
00287 {
00288 }
00289 
00290 // this function is called by KResolverThread::run
00291 RequestData* KResolverManager::requestData(KResolverThread *th, int maxWaitTime)
00292 {
00294   // This function is called in a worker thread!!
00296 
00297   resInit++;
00298 
00299   // lock the mutex, so that the manager thread or other threads won't
00300   // interfere.
00301   QMutexLocker locker(&mutex);
00302   RequestData *data = findData(th);
00303 
00304   if (data)
00305     // it found something, that's good
00306     return data;
00307 
00308   // nope, nothing found; sleep for a while
00309   availableThreads++;
00310   feedWorkers.wait(&mutex, maxWaitTime);
00311   availableThreads--;
00312 
00313   data = findData(th);
00314   if (data == 0L)
00315     {
00316       // if we could find no data, this thread will exit
00317       runningThreads--;
00318       resInit--;
00319     }
00320   return data;
00321 }
00322 
00323 RequestData* KResolverManager::findData(KResolverThread* th)
00324 {
00326   // This function is called by @ref requestData above and must
00327   // always be called with a locked mutex
00329 
00330   // now find data to be processed
00331   for (RequestData *curr = newRequests.first(); curr; curr = newRequests.next())
00332     if (!curr->worker->m_finished)
00333       {
00334     // found one
00335     if (curr->obj)
00336       curr->obj->status = KResolver::InProgress;
00337     curr->worker->th = th;
00338 
00339     // move it to the currentRequests list
00340     currentRequests.append(newRequests.take());
00341 
00342     return curr;
00343       }
00344 
00345   // found nothing!
00346   return 0L;
00347 }
00348 
00349 // this function is called by KResolverThread::run
00350 void KResolverManager::releaseData(KResolverThread *, RequestData* data)
00351 {
00353   // This function is called in a worker thread!!
00355 
00356   resInit--;
00357 
00358   //qDebug("KResolverManager::releaseData(%u/%p): %p has been released", pid, 
00359 //   (void*)QThread::currentThread(), (void*)data);
00360 
00361   if (data->obj)
00362     {
00363       if (data->nRequests > 0)
00364     // PostProcessing means "we're done with our blocking stuff, but we're waiting
00365     // for some child request to finish"
00366     data->obj->status = KResolver::PostProcessing;  
00367       else
00368     // this may change after post-processing
00369     data->obj->status = data->worker->results.isEmpty() ? KResolver::Failed : KResolver::Success;
00370     }
00371       
00372   data->worker->m_finished = true;
00373   data->worker->th = 0L;    // this releases the object
00374 
00375   // handle finished requests
00376   handleFinished();
00377 }
00378 
00379 // this function is called by KResolverManager::releaseData above
00380 void KResolverManager::handleFinished()
00381 {  
00382   bool redo = false;
00383   QPtrQueue<RequestData> doneRequests;
00384 
00385   mutex.lock();
00386 
00387   // loop over all items on the currently running list
00388   // we loop from the last to the first so that we catch requests with "requestors" before
00389   // we catch the requestor itself.
00390   RequestData *curr = currentRequests.last();
00391   while (curr)
00392     {
00393       if (curr->worker->th == 0L)
00394     {
00395       if (handleFinishedItem(curr))
00396         {
00397           doneRequests.enqueue(currentRequests.take());
00398           if (curr->requestor &&
00399           curr->requestor->nRequests == 0 && 
00400           curr->requestor->worker->m_finished)
00401         // there's a requestor that is now finished
00402         redo = true;
00403         }
00404     }
00405       
00406       curr = currentRequests.prev();
00407     }
00408       
00409   //qDebug("KResolverManager::handleFinished(%u): %d requests to notify", pid, doneRequests.count());
00410   while (RequestData *d = doneRequests.dequeue())
00411     doNotifying(d);
00412 
00413   mutex.unlock();
00414 
00415   if (redo)
00416     {
00417       //qDebug("KResolverManager::handleFinished(%u): restarting processing to catch requestor",
00418     //     pid);
00419       handleFinished();
00420     }
00421 }
00422 
00423 // This function is called by KResolverManager::handleFinished above
00424 bool KResolverManager::handleFinishedItem(RequestData* curr)
00425                       
00426 {
00427   // for all items that aren't currently running, remove from the list
00428   // this includes all finished or cancelled requests
00429 
00430   if (curr->worker->m_finished && curr->nRequests == 0)
00431     {
00432       // this one has finished
00433       if (curr->obj)
00434     curr->obj->status = KResolver::Success; // this may change after the post-processing
00435 
00436       if (curr->requestor)
00437     --curr->requestor->nRequests;
00438 
00439       //qDebug("KResolverManager::handleFinishedItem(%u): removing %p since it's done",
00440     //     pid, (void*)curr);
00441       return true;
00442     }
00443   return false;
00444 }
00445 
00446 
00447 
00448 void KResolverManager::registerNewWorker(KResolverWorkerFactoryBase *factory)
00449 {
00450   workerFactories.append(factory);
00451 }
00452 
00453 KResolverWorkerBase* KResolverManager::findWorker(KResolverPrivate* p)
00454 {
00456   // this function can be called on any user thread
00458 
00459   // this function is called with an unlocked mutex and it's expected to be 
00460   // thread-safe!
00461   // but the factory list is expected not to be changed asynchronously
00462 
00463   // This function is responsible for finding a suitable worker for the given
00464   // input. That means we have to do a costly operation to create each worker
00465   // class and call their preprocessing functions. The first one that
00466   // says they can process (i.e., preprocess() returns true) will get the job.
00467 
00468   KResolverWorkerBase *worker;
00469   for (KResolverWorkerFactoryBase *factory = workerFactories.first(); factory; 
00470        factory = workerFactories.next())
00471     {
00472       worker = factory->create();
00473 
00474       // set up the data the worker needs to preprocess
00475       worker->input = &p->input;
00476 
00477       if (worker->preprocess())
00478     {
00479       // good, this one says it can process
00480       if (worker->m_finished)      
00481         p->status = !worker->results.isEmpty() ?
00482           KResolver::Success : KResolver::Failed;
00483       else
00484         p->status = KResolver::Queued;
00485       return worker;
00486     }
00487 
00488       // no, try again
00489       delete worker;
00490     }
00491 
00492   // found no worker
00493   return 0L;
00494 }
00495 
00496 void KResolverManager::doNotifying(RequestData *p)
00497 {
00499   // This function may be called on any thread
00500   // any thread at all: user threads, GUI thread, manager thread or worker thread
00502 
00503   // Notification and finalisation
00504   //
00505   // Once a request has finished the normal processing, we call the
00506   // post processing function.
00507   //
00508   // After that is done, we will consolidate all results in the object's
00509   // KResolverResults and then post an event indicating that the signal
00510   // be emitted
00511   //
00512   // In case we detect that the object is waiting for completion, we do not
00513   // post the event, for KResolver::wait will take care of emitting the
00514   // signal.
00515   //
00516   // Once we release the mutex on the object, we may no longer reference it
00517   // for it might have been deleted.
00518 
00519   // "User" objects are those that are not created by the manager. Note that
00520   // objects created by worker threads are considered "user" objects. Objects
00521   // created by the manager are those created for KResolver::resolveAsync.
00522   // We should delete them.
00523 
00524   if (p->obj)
00525     {
00526       // lock the object
00527       p->obj->mutex.lock();
00528       KResolver* parent = p->obj->parent; // is 0 for non-"user" objects
00529       KResolverResults& r = p->obj->results;
00530 
00531       if (p->obj->status == KResolver::Canceled)
00532     {
00533       p->obj->status = KResolver::Canceled;
00534       p->obj->errorcode = KResolver::Canceled;
00535       p->obj->syserror = 0;
00536       r.setError(KResolver::Canceled, 0);
00537     }
00538       else if (p->worker)
00539     {
00540       // post processing
00541       p->worker->postprocess(); // ignore the result
00542 
00543       // copy the results from the worker thread to the final
00544       // object
00545       r = p->worker->results;
00546 
00547       // reset address
00548       r.setAddress(p->input->node, p->input->service);
00549 
00550       //qDebug("KResolverManager::doNotifying(%u/%p): for %p whose status is %d and has %d results", 
00551          //pid, (void*)QThread::currentThread(), (void*)p, p->obj->status, r.count());
00552 
00553       p->obj->errorcode = r.error();
00554       p->obj->syserror = r.systemError();
00555       p->obj->status = !r.isEmpty() ? 
00556         KResolver::Success : KResolver::Failed;
00557     }
00558       else
00559     {
00560       r.empty();
00561       r.setError(p->obj->errorcode, p->obj->syserror);
00562     }
00563 
00564       // check whether there's someone waiting
00565       if (!p->obj->waiting && parent)
00566     // no, so we must post an event requesting that the signal be emitted
00567     // sorry for the C-style cast, but neither static nor reintepret cast work
00568     // here; I'd have to do two casts
00569     QApplication::postEvent(parent, new QEvent((QEvent::Type)(ResolutionCompleted)));
00570 
00571       // release the mutex
00572       p->obj->mutex.unlock();
00573     }
00574   else
00575     {
00576       // there's no object!
00577       if (p->worker)
00578     p->worker->postprocess();
00579     }
00580 
00581   delete p->worker;
00582 
00583   // ignore p->requestor and p->nRequests
00584   // they have been dealt with by the main loop
00585 
00586   delete p;
00587 
00588   // notify any objects waiting in KResolver::wait
00589   notifyWaiters.wakeAll();
00590 }
00591 
00592 // enqueue a new request
00593 // this function is called from KResolver::start and 
00594 // from KResolverWorkerBase::enqueue
00595 void KResolverManager::enqueue(KResolver *obj, RequestData *requestor)
00596 {
00597   RequestData *newrequest = new RequestData;
00598   newrequest->nRequests = 0;
00599   newrequest->obj = obj->d;
00600   newrequest->input = &obj->d->input;
00601   newrequest->requestor = requestor;
00602 
00603   // when processing a new request, find the most
00604   // suitable worker
00605   if ((newrequest->worker = findWorker(obj->d)) == 0L)
00606     {
00607       // oops, problem
00608       // cannot find a worker class for this guy
00609       obj->d->status = KResolver::Failed;
00610       obj->d->errorcode = KResolver::UnsupportedFamily;
00611       obj->d->syserror = 0;
00612 
00613       doNotifying(newrequest);
00614       return;
00615     }
00616 
00617   // no, queue it
00618   // p->status was set in findWorker!
00619   if (requestor)
00620     requestor->nRequests++;
00621 
00622   if (!newrequest->worker->m_finished)
00623     dispatch(newrequest);
00624   else if (newrequest->nRequests > 0)
00625     {
00626       mutex.lock();
00627       currentRequests.append(newrequest);
00628       mutex.unlock();
00629     }
00630   else
00631     // already done
00632     doNotifying(newrequest);
00633 }
00634 
00635 // a new request has been created
00636 // dispatch it
00637 void KResolverManager::dispatch(RequestData *data)
00638 {
00639   // As stated in the beginning of the file, this function
00640   // is supposed to verify the availability of threads, start
00641   // any if necessary
00642 
00643   QMutexLocker locker(&mutex);
00644 
00645   // add to the queue
00646   newRequests.append(data);
00647 
00648   // check if we need to start a new thread
00649   //
00650   // we depend on the variables availableThreads and runningThreads to
00651   // know if we are supposed to start any threads:
00652   // - if availableThreads > 0, then there is at least one thread waiting,
00653   //    blocked in KResolverManager::requestData. It can't unblock
00654   //    while we are holding the mutex locked, therefore we are sure that
00655   //    our event will be handled
00656   // - if availableThreads == 0:
00657   //   - if runningThreads < maxThreads
00658   //     we will start a new thread, which will certainly block in
00659   //     KResolverManager::requestData because we are holding the mutex locked
00660   //   - if runningThreads == maxThreads
00661   //     This situation generally means that we have already maxThreads running
00662   //     and that all of them are processing. We will not start any new threads,
00663   //     but will instead wait for one to finish processing and request new data
00664   //
00665   //     There's a possible race condition here, which goes unhandled: if one of
00666   //     threads has timed out waiting for new data and is in the process of
00667   //     exiting. In that case, availableThreads == 0 and runningThreads will not
00668   //     have decremented yet. This means that we will not start a new thread
00669   //     that we could have. However, since there are other threads working, our
00670   //     event should be handled soon.
00671   //     It won't be handled if and only if ALL threads are in the process of 
00672   //     exiting. That situation is EXTREMELY unlikely and is not handled either.
00673   //
00674   if (availableThreads == 0 && runningThreads < maxThreads)
00675     {
00676       // yes, a new thread should be started
00677 
00678       // find if there's a finished one
00679       KResolverThread *th = workers.first();
00680       while (th && th->running())
00681     th = workers.next();
00682 
00683       if (th == 0L)
00684     // no, create one
00685     th = new KResolverThread;
00686       else
00687     workers.take();
00688 
00689       th->start();
00690       workers.append(th);
00691       runningThreads++;
00692     }
00693 
00694   feedWorkers.wakeAll();
00695 
00696   // clean up idle threads
00697   workers.first();
00698   while (workers.current())
00699     {
00700       if (!workers.current()->running())
00701     workers.remove();
00702       else
00703     workers.next();
00704     }
00705 }
00706 
00707 // this function is called by KResolverManager::dequeue
00708 bool KResolverManager::dequeueNew(KResolver* obj)
00709 {
00710   // This function must be called with a locked mutex
00711   // Deadlock warning:
00712   // always lock the global mutex first if both mutexes must be locked
00713 
00714   KResolverPrivate *d = obj->d;
00715 
00716   // check if it's in the new request list
00717   RequestData *curr = newRequests.first(); 
00718   while (curr)
00719     if (curr->obj == d)
00720       {
00721     // yes, this object is still in the list
00722     // but it has never been processed
00723     d->status = KResolver::Canceled;
00724     d->errorcode = KResolver::Canceled;
00725     d->syserror = 0;
00726     newRequests.take();
00727 
00728     delete curr->worker;
00729     delete curr;
00730     
00731     return true;
00732       }
00733     else
00734       curr = newRequests.next();
00735 
00736   // check if it's running
00737   curr = currentRequests.first();
00738   while (curr)
00739     if (curr->obj == d)
00740       {
00741     // it's running. We cannot simply take it out of the list.
00742     // it will be handled when the thread that is working on it finishes
00743     d->mutex.lock();
00744 
00745     d->status = KResolver::Canceled;
00746     d->errorcode = KResolver::Canceled;
00747     d->syserror = 0;
00748 
00749     // disengage from the running threads
00750     curr->obj = 0L;
00751     curr->input = 0L;
00752     if (curr->worker)
00753       curr->worker->input = 0L;
00754 
00755     d->mutex.unlock();
00756       }
00757     else
00758       curr = currentRequests.next();
00759 
00760   return false;
00761 }
00762 
00763 // this function is called by KResolver::cancel
00764 // it's expected to be thread-safe
00765 void KResolverManager::dequeue(KResolver *obj)
00766 {
00767   QMutexLocker locker(&mutex);
00768   dequeueNew(obj);
00769 }
00770 
00771 } // anonymous namespace
KDE Logo
This file is part of the documentation for kdecore Library Version 3.3.1.
Documentation copyright © 1996-2004 the KDE developers.
Generated on Fri Feb 18 15:10:00 2005 by doxygen 1.3.9.1 written by Dimitri van Heesch, © 1997-2003