CxPool.hxx

00001 /*
00002 ** Stechec project is free software; you can redistribute it and/or modify
00003 ** it under the terms of the GNU General Public License as published by
00004 ** the Free Software Foundation; either version 2 of the License, or
00005 ** (at your option) any later version.
00006 **
00007 ** The complete GNU General Public Licence Notice can be found as the
00008 ** `NOTICE' file in the root directory.
00009 **
00010 ** Copyright (C) 2007 Prologin
00011 */
00012 
00013 #include <sys/select.h>
00014 #include <sys/poll.h>
00015 #include <cerrno>
00016 
00017 #include "logger/log.hh"
00018 #include "Cx.hh"
00019 #include "CxPool.hh"
00020 
00021 template <typename T>
00022 CxPool<T>::CxPool(int timeout)
00023   : timeout_(timeout),
00024     lock_(NULL)
00025 {
00026 #if HAVE_EPOLL
00027   epoll_fd_ = epoll_create(10);
00028   if (epoll_fd_ < 0)
00029     throw NetError("Poll");
00030 #else
00031   ev_size_ = 12;
00032   ev_ = new struct pollfd[ev_size_];
00033 #endif
00034 }
00035 
00036 template <typename T>
00037 CxPool<T>::~CxPool()
00038 {
00039 #if HAVE_EPOLL
00040   close(epoll_fd_);
00041 #endif
00042 }
00043 
00044 template <typename T>
00045 void CxPool<T>::setLock(pthread_mutex_t* lock)
00046 {
00047   lock_ = lock;
00048 }
00049 
00050 
00051 template <typename T>
00052 void CxPool<T>::addElt(T* elt)
00053 {
00054   int fd = elt->fd_;
00055 
00056   if (elt_list_.count(fd) > 0)
00057     WARN("elt with fd `%1' already set", fd);
00058   assert(fd >= 0);
00059 
00060   elt_list_[fd] = elt;
00061 
00062 #if HAVE_EPOOL
00063   struct epoll_event ev;
00064 
00065   ev.events = EPOLLIN;
00066   ev.data.fd = fd;
00067   if (epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &ev))
00068     throw NetError("epoll_ctl");
00069 #endif
00070 }
00071 
00072 // 'fd' must still be valid.
00073 template <typename T>
00074 void CxPool<T>::removeElt(T* elt)
00075 {
00076   int fd = elt->fd_;
00077 
00078   if (elt_list_.count(fd) == 0)
00079     {
00080       WARN("trying to remove non-existing element (fd `%1')", fd);
00081       return;
00082     }
00083   elt_list_.erase(fd);
00084 
00085 #if HAVE_EPOOL
00086   struct epoll_event ev;
00087   int fd = elt->getFd();
00088 
00089   ev.events = EPOLLIN;
00090   ev.data.fd = fd;
00091   if (epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, &ev))
00092     throw NetError("epoll_ctl");
00093 #endif
00094 }
00095 
00096 
00097 template <typename T>
00098 int CxPool<T>::size() const
00099 {
00100   return elt_list_.size();
00101 }
00102 
00103 template <typename T>
00104 void CxPool<T>::flush()
00105 {
00106   elt_list_.clear();
00107 }
00108 
00109 template <typename T>
00110 const typename CxPool<T>::EltList& CxPool<T>::poll()
00111 {
00112   int nb_ready;
00113 
00114   ready_list_.clear();
00115 
00116 #if HAVE_EPOOL
00117   struct epoll_event ev[32];
00118 
00119   pthread_mutex_unlock(lock_);
00120   nb_ready = epoll_wait(epoll_fd_, &ev, 32, timeout_);
00121   pthread_mutex_lock(lock_);
00122 
00123   if (nb_ready < 0 && errno != EINTR)
00124     throw NetError("epoll");
00125   if (nb_ready == 0)
00126     return;
00127 
00128   for (int i = 0; i < nb_ready; i++)
00129     {
00130       int fd = ev[i].data.fd;
00131       ready_list_.push_back(std::make_pair(1, elt_list_[fd]));
00132     }
00133 
00134 #else
00135 
00136   int elt_size = elt_list_.size();
00137   InternalEltIter it;
00138   int i;
00139 
00140   // elt_list_ size may have changed outside.
00141   if (ev_size_ < elt_size)
00142     {
00143       ev_size_ = elt_size * 2;
00144       delete[] ev_;
00145       ev_ = new struct pollfd[ev_size_];
00146     }
00147 
00148   // Fill ev_ with file descriptors.
00149   for (it = elt_list_.begin(), i = 0; it != elt_list_.end(); ++it, i++)
00150     {
00151       T* cx = it->second;
00152 
00153       if ((cx->state_ & CX_READ) && cx->arrangeReadBuffer())
00154         {
00155           // There is already some data to read. Push it on ready list.
00156           ready_list_.push_back(std::make_pair(E_FD_READ_READY, cx));
00157         }
00158       else if (cx->state_ & (CX_READ | CX_LISTENING))
00159         {
00160           ev_[i].events = POLLIN;
00161           ev_[i].fd = cx->fd_;
00162           ev_[i].revents = 0;
00163         }
00164       else
00165         {
00166           // We only support read or listening sockets. Connecting and
00167           // closed sockets are not intended to enter in the pool.
00168           // Report it as closed, caller _has to_ remove it from list.
00169           ready_list_.push_back(std::make_pair(E_FD_CONNECTION_CLOSED, cx));
00170           elt_size--;
00171         }
00172     }
00173 
00174   // No need to wait if some elements are ready
00175   if (!ready_list_.empty())
00176     {
00177       LOG1("ready list not empty: %1", ready_list_.size()); 
00178       return ready_list_;
00179     }
00180 
00181   if (lock_)
00182     pthread_mutex_unlock(lock_);
00183   nb_ready = ::poll(ev_, elt_size, timeout_);
00184   if (lock_)
00185     pthread_mutex_lock(lock_);
00186 
00187   if (nb_ready < 0 && errno != EINTR)
00188     throw NetSysError("poll");
00189   if (nb_ready == 0)
00190     return ready_list_;
00191 
00192   for (int i = 0; i < elt_size; ++i)
00193     if ((ev_[i].revents & POLLIN))
00194       {
00195         T* cx = elt_list_[ev_[i].fd];
00196         
00197         // Read data once from fd.
00198         if (cx->state_ & CX_READ)
00199           {
00200             cx->arrangeReadBuffer();
00201             int ret = cx->recvData(false);
00202             if (ret == -1)
00203               {
00204                 ready_list_.push_back(std::make_pair(E_FD_CONNECTION_CLOSED,
00205                                                      elt_list_[ev_[i].fd]));
00206               }
00207             else
00208               {
00209                 cx->buff_recv_tail_ += ret;
00210                 if (cx->arrangeReadBuffer())
00211                   ready_list_.push_back(std::make_pair(E_FD_READ_READY,
00212                                                        elt_list_[ev_[i].fd]));
00213               }
00214           }
00215         else if (cx->state_ & CX_LISTENING)
00216           {
00217             ready_list_.push_back(std::make_pair(E_FD_CONNECTION_PENDING,
00218                                                  elt_list_[ev_[i].fd]));
00219           }
00220         else
00221           {
00222             // BUG: should not have socket closed for reading.
00223             WARN("bug spotted in CxPool::poll");
00224           }
00225       }
00226 
00227 #endif
00228 
00229   return ready_list_;
00230 }

Generated on Sat Jun 23 16:07:23 2007 for Stechec/TBT by  doxygen 1.4.7