00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013 #include <sys/select.h>
00014 #include <sys/poll.h>
00015 #include <cerrno>
00016
00017 #include "logger/log.hh"
00018 #include "Cx.hh"
00019 #include "CxPool.hh"
00020
00021 template <typename T>
00022 CxPool<T>::CxPool(int timeout)
00023 : timeout_(timeout),
00024 lock_(NULL)
00025 {
00026 #if HAVE_EPOLL
00027 epoll_fd_ = epoll_create(10);
00028 if (epoll_fd_ < 0)
00029 throw NetError("Poll");
00030 #else
00031 ev_size_ = 12;
00032 ev_ = new struct pollfd[ev_size_];
00033 #endif
00034 }
00035
00036 template <typename T>
00037 CxPool<T>::~CxPool()
00038 {
00039 #if HAVE_EPOLL
00040 close(epoll_fd_);
00041 #endif
00042 }
00043
00044 template <typename T>
00045 void CxPool<T>::setLock(pthread_mutex_t* lock)
00046 {
00047 lock_ = lock;
00048 }
00049
00050
00051 template <typename T>
00052 void CxPool<T>::addElt(T* elt)
00053 {
00054 int fd = elt->fd_;
00055
00056 if (elt_list_.count(fd) > 0)
00057 WARN("elt with fd `%1' already set", fd);
00058 assert(fd >= 0);
00059
00060 elt_list_[fd] = elt;
00061
00062 #if HAVE_EPOOL
00063 struct epoll_event ev;
00064
00065 ev.events = EPOLLIN;
00066 ev.data.fd = fd;
00067 if (epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &ev))
00068 throw NetError("epoll_ctl");
00069 #endif
00070 }
00071
00072
00073 template <typename T>
00074 void CxPool<T>::removeElt(T* elt)
00075 {
00076 int fd = elt->fd_;
00077
00078 if (elt_list_.count(fd) == 0)
00079 {
00080 WARN("trying to remove non-existing element (fd `%1')", fd);
00081 return;
00082 }
00083 elt_list_.erase(fd);
00084
00085 #if HAVE_EPOOL
00086 struct epoll_event ev;
00087 int fd = elt->getFd();
00088
00089 ev.events = EPOLLIN;
00090 ev.data.fd = fd;
00091 if (epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, &ev))
00092 throw NetError("epoll_ctl");
00093 #endif
00094 }
00095
00096
00097 template <typename T>
00098 int CxPool<T>::size() const
00099 {
00100 return elt_list_.size();
00101 }
00102
00103 template <typename T>
00104 void CxPool<T>::flush()
00105 {
00106 elt_list_.clear();
00107 }
00108
00109 template <typename T>
00110 const typename CxPool<T>::EltList& CxPool<T>::poll()
00111 {
00112 int nb_ready;
00113
00114 ready_list_.clear();
00115
00116 #if HAVE_EPOOL
00117 struct epoll_event ev[32];
00118
00119 pthread_mutex_unlock(lock_);
00120 nb_ready = epoll_wait(epoll_fd_, &ev, 32, timeout_);
00121 pthread_mutex_lock(lock_);
00122
00123 if (nb_ready < 0 && errno != EINTR)
00124 throw NetError("epoll");
00125 if (nb_ready == 0)
00126 return;
00127
00128 for (int i = 0; i < nb_ready; i++)
00129 {
00130 int fd = ev[i].data.fd;
00131 ready_list_.push_back(std::make_pair(1, elt_list_[fd]));
00132 }
00133
00134 #else
00135
00136 int elt_size = elt_list_.size();
00137 InternalEltIter it;
00138 int i;
00139
00140
00141 if (ev_size_ < elt_size)
00142 {
00143 ev_size_ = elt_size * 2;
00144 delete[] ev_;
00145 ev_ = new struct pollfd[ev_size_];
00146 }
00147
00148
00149 for (it = elt_list_.begin(), i = 0; it != elt_list_.end(); ++it, i++)
00150 {
00151 T* cx = it->second;
00152
00153 if ((cx->state_ & CX_READ) && cx->arrangeReadBuffer())
00154 {
00155
00156 ready_list_.push_back(std::make_pair(E_FD_READ_READY, cx));
00157 }
00158 else if (cx->state_ & (CX_READ | CX_LISTENING))
00159 {
00160 ev_[i].events = POLLIN;
00161 ev_[i].fd = cx->fd_;
00162 ev_[i].revents = 0;
00163 }
00164 else
00165 {
00166
00167
00168
00169 ready_list_.push_back(std::make_pair(E_FD_CONNECTION_CLOSED, cx));
00170 elt_size--;
00171 }
00172 }
00173
00174
00175 if (!ready_list_.empty())
00176 {
00177 LOG1("ready list not empty: %1", ready_list_.size());
00178 return ready_list_;
00179 }
00180
00181 if (lock_)
00182 pthread_mutex_unlock(lock_);
00183 nb_ready = ::poll(ev_, elt_size, timeout_);
00184 if (lock_)
00185 pthread_mutex_lock(lock_);
00186
00187 if (nb_ready < 0 && errno != EINTR)
00188 throw NetSysError("poll");
00189 if (nb_ready == 0)
00190 return ready_list_;
00191
00192 for (int i = 0; i < elt_size; ++i)
00193 if ((ev_[i].revents & POLLIN))
00194 {
00195 T* cx = elt_list_[ev_[i].fd];
00196
00197
00198 if (cx->state_ & CX_READ)
00199 {
00200 cx->arrangeReadBuffer();
00201 int ret = cx->recvData(false);
00202 if (ret == -1)
00203 {
00204 ready_list_.push_back(std::make_pair(E_FD_CONNECTION_CLOSED,
00205 elt_list_[ev_[i].fd]));
00206 }
00207 else
00208 {
00209 cx->buff_recv_tail_ += ret;
00210 if (cx->arrangeReadBuffer())
00211 ready_list_.push_back(std::make_pair(E_FD_READ_READY,
00212 elt_list_[ev_[i].fd]));
00213 }
00214 }
00215 else if (cx->state_ & CX_LISTENING)
00216 {
00217 ready_list_.push_back(std::make_pair(E_FD_CONNECTION_PENDING,
00218 elt_list_[ev_[i].fd]));
00219 }
00220 else
00221 {
00222
00223 WARN("bug spotted in CxPool::poll");
00224 }
00225 }
00226
00227 #endif
00228
00229 return ready_list_;
00230 }