Commit bb9f0c4d authored by John Selbie's avatar John Selbie

latest changes

parent ff723d30
...@@ -221,6 +221,7 @@ public: ...@@ -221,6 +221,7 @@ public:
_lookuptable = table; _lookuptable = table;
_indexlist = indexlist; _indexlist = indexlist;
_fIndexValid = (_indexlist != NULL); _fIndexValid = (_indexlist != NULL);
_indexStart = 0;
Reset(); Reset();
...@@ -248,6 +249,7 @@ public: ...@@ -248,6 +249,7 @@ public:
_size = 0; _size = 0;
_fIndexValid = (_indexlist != NULL); // index is valid when we are empty _fIndexValid = (_indexlist != NULL); // index is valid when we are empty
_indexStart = 0;
} }
bool IsValid() bool IsValid()
......
...@@ -38,6 +38,13 @@ class CEpoll : ...@@ -38,6 +38,13 @@ class CEpoll :
private: private:
int _epollfd; int _epollfd;
epoll_event* _events;
size_t _sizeEvents; // total allocated size for events
size_t _pendingCount; // number of valid events in _events
size_t _currentEventIndex; // which one to process next
uint32_t ToNativeFlags(uint32_t eventflags); uint32_t ToNativeFlags(uint32_t eventflags);
uint32_t FromNativeFlags(uint32_t eventflags); uint32_t FromNativeFlags(uint32_t eventflags);
...@@ -58,7 +65,11 @@ public: ...@@ -58,7 +65,11 @@ public:
CEpoll::CEpoll() : CEpoll::CEpoll() :
_epollfd(-1) _epollfd(-1),
_events(NULL),
_sizeEvents(0),
_pendingCount(0),
_currentEventIndex(0)
{ {
} }
...@@ -102,14 +113,27 @@ uint32_t CEpoll::FromNativeFlags(uint32_t eventflags) ...@@ -102,14 +113,27 @@ uint32_t CEpoll::FromNativeFlags(uint32_t eventflags)
HRESULT CEpoll::Initialize(size_t maxSockets) HRESULT CEpoll::Initialize(size_t maxSockets)
{ {
HRESULT hr = S_OK;
ASSERT(_epollfd == -1); ASSERT(_epollfd == -1);
Close(); Close();
_epollfd = epoll_create(maxSockets); // maxSockets is likely ignored by epoll_create _epollfd = epoll_create(maxSockets); // maxSockets is likely ignored by epoll_create
if (_epollfd == -1) ChkIf(_epollfd == -1, ERRNOHR);
_sizeEvents = maxSockets;
_events = new epoll_event[maxSockets];
ChkIf(_events == NULL, E_OUTOFMEMORY);
_pendingCount = 0;
_currentEventIndex = 0;
Cleanup:
if (FAILED(hr))
{ {
return ERRNOHR; Close();
} }
return S_OK; return S_OK;
} }
...@@ -119,8 +143,15 @@ HRESULT CEpoll::Close() ...@@ -119,8 +143,15 @@ HRESULT CEpoll::Close()
if (_epollfd != -1) if (_epollfd != -1)
{ {
close(_epollfd); close(_epollfd);
_epollfd = -1;
} }
delete [] _events;
_events = NULL;
_sizeEvents = 0;
_pendingCount = 0;
_currentEventIndex = 0;
return S_OK; return S_OK;
} }
...@@ -144,6 +175,12 @@ Cleanup: ...@@ -144,6 +175,12 @@ Cleanup:
HRESULT CEpoll::Remove(int fd) HRESULT CEpoll::Remove(int fd)
{ {
// Remove doesn't bother to check to see if the socket is within any
// unprocessed event within _events. A more robust polling and eventing
// library might want to check this. For the stun server, "Remove" gets
// called immediately after WaitForNextEvent in most cases. That is, the socket
// we just got notified for isn't going to be within the _events array
HRESULT hr = S_OK; HRESULT hr = S_OK;
epoll_event ev={}; // pass empty ev, because some implementations of epoll_ctl can't handle a NULL event struct epoll_event ev={}; // pass empty ev, because some implementations of epoll_ctl can't handle a NULL event struct
...@@ -175,18 +212,30 @@ Cleanup: ...@@ -175,18 +212,30 @@ Cleanup:
HRESULT CEpoll::WaitForNextEvent(PollEvent* pPollEvent, int timeoutMilliseconds) HRESULT CEpoll::WaitForNextEvent(PollEvent* pPollEvent, int timeoutMilliseconds)
{ {
HRESULT hr = S_OK; HRESULT hr = S_OK;
epoll_event ev = {}; epoll_event *pEvent = NULL;
int ret; int ret = 0;
ChkIfA(_epollfd==-1, E_UNEXPECTED); ChkIfA(_epollfd==-1, E_UNEXPECTED);
if (_currentEventIndex >= _pendingCount)
{
_currentEventIndex = 0;
_pendingCount = 0;
ret = ::epoll_wait(_epollfd, _events, _sizeEvents, timeoutMilliseconds);
ChkIf(ret <= -1, ERRNOHR);
ChkIf(ret == 0, S_FALSE);
_pendingCount = (size_t)ret;
}
ret = ::epoll_wait(_epollfd, &ev, 1, timeoutMilliseconds);
ChkIf(ret == -1, ERRNOHR);
ChkIf(ret == 0, S_FALSE); pEvent = &_events[_currentEventIndex];
_currentEventIndex++;
pPollEvent->fd = ev.data.fd;
pPollEvent->eventflags = FromNativeFlags(ev.events); pPollEvent->fd = pEvent->data.fd;
pPollEvent->eventflags = FromNativeFlags(pEvent->events);
Cleanup: Cleanup:
return hr; return hr;
...@@ -392,7 +441,7 @@ HRESULT CPoll::WaitForNextEvent(PollEvent* pPollEvent, int timeoutMilliseconds) ...@@ -392,7 +441,7 @@ HRESULT CPoll::WaitForNextEvent(PollEvent* pPollEvent, int timeoutMilliseconds)
ChkIfA(pPollEvent == NULL, E_INVALIDARG); ChkIfA(pPollEvent == NULL, E_INVALIDARG);
pPollEvent->eventflags = 0; pPollEvent->eventflags = 0;
ChkIfA(size == 0, S_FALSE); ChkIf(size == 0, S_FALSE);
// check first to see if there is a pending event from the last poll() call // check first to see if there is a pending event from the last poll() call
fFound = FindNextEvent(pPollEvent); fFound = FindNextEvent(pPollEvent);
...@@ -401,6 +450,8 @@ HRESULT CPoll::WaitForNextEvent(PollEvent* pPollEvent, int timeoutMilliseconds) ...@@ -401,6 +450,8 @@ HRESULT CPoll::WaitForNextEvent(PollEvent* pPollEvent, int timeoutMilliseconds)
{ {
ASSERT(_unreadcount == 0); ASSERT(_unreadcount == 0);
_unreadcount = 0;
list = _fds.data(); list = _fds.data();
ret = poll(list, size, timeoutMilliseconds); ret = poll(list, size, timeoutMilliseconds);
...@@ -447,19 +498,13 @@ bool CPoll::FindNextEvent(PollEvent* pEvent) ...@@ -447,19 +498,13 @@ bool CPoll::FindNextEvent(PollEvent* pEvent)
pEvent->eventflags = FromNativeFlags(list[slotindex].revents); pEvent->eventflags = FromNativeFlags(list[slotindex].revents);
list[slotindex].revents = 0; list[slotindex].revents = 0;
fFound = true; fFound = true;
_rotation++;
_unreadcount--;
break; break;
} }
} }
if (fFound) // don't increment _rotation if we didn't find anything
{
_rotation++;
}
else
{
_unreadcount = _unreadcount - 1;
// don't increment rotation if we didn't find anything
}
return fFound; return fFound;
} }
......
...@@ -268,8 +268,10 @@ HRESULT CTCPStunThread::Init(const TransportAddressSet& tsaListen, const Transpo ...@@ -268,8 +268,10 @@ HRESULT CTCPStunThread::Init(const TransportAddressSet& tsaListen, const Transpo
ChkA(CreateListenSockets()); ChkA(CreateListenSockets());
ChkA(CreatePipes()); ChkA(CreatePipes());
ChkA(CreatePollingInstance(IPOLLING_TYPE_BEST, (size_t)_maxConnections, _spPolling.GetPointerPointer())); // +5 for listening sockets and pipe
//ChkA(CreatePollingInstance(IPOLLING_TYPE_BEST, (size_t)(_maxConnections + 5), _spPolling.GetPointerPointer()));
ChkA(CreatePollingInstance(IPOLLING_TYPE_POLL, (size_t)(_maxConnections + 5), _spPolling.GetPointerPointer()));
// add listen socket to epoll // add listen socket to epoll
ASSERT(_fListenSocketsOnEpoll == false); ASSERT(_fListenSocketsOnEpoll == false);
...@@ -520,7 +522,7 @@ StunConnection* CTCPStunThread::AcceptConnection(CStunSocket* pListenSocket) ...@@ -520,7 +522,7 @@ StunConnection* CTCPStunThread::AcceptConnection(CStunSocket* pListenSocket)
pConn->_stunsocket.GetLocalAddress().ToStringBuffer(szIPLocal, ARRAYSIZE(szIPLocal)); pConn->_stunsocket.GetLocalAddress().ToStringBuffer(szIPLocal, ARRAYSIZE(szIPLocal));
pConn->_stunsocket.GetRemoteAddress().ToStringBuffer(szIPRemote, ARRAYSIZE(szIPRemote)); pConn->_stunsocket.GetRemoteAddress().ToStringBuffer(szIPRemote, ARRAYSIZE(szIPRemote));
Logging::LogMsg(LL_VERBOSE, "accepting new connection on socket %d from %s on interface %s", pConn->_stunsocket.GetSocketHandle(), szIPRemote, szIPLocal); Logging::LogMsg(LL_VERBOSE, "accepting new connection on socket %d from %s on interface %s", pConn->_stunsocket.GetSocketHandle(), szIPRemote, szIPLocal);
} }
Cleanup: Cleanup:
......
...@@ -30,6 +30,8 @@ ...@@ -30,6 +30,8 @@
#include "cmdlineparser.h" #include "cmdlineparser.h"
#include "oshelper.h" #include "oshelper.h"
#include "prettyprint.h" #include "prettyprint.h"
#include "polling.h"
#include "testpolling.h"
void ReaderFuzzTest() void ReaderFuzzTest()
{ {
...@@ -78,6 +80,7 @@ void RunUnitTests() ...@@ -78,6 +80,7 @@ void RunUnitTests()
boost::shared_ptr<CTestClientLogic> spTestClientLogic(new CTestClientLogic); boost::shared_ptr<CTestClientLogic> spTestClientLogic(new CTestClientLogic);
boost::shared_ptr<CTestRecvFromEx> spTestRecvFromEx(new CTestRecvFromEx); boost::shared_ptr<CTestRecvFromEx> spTestRecvFromEx(new CTestRecvFromEx);
boost::shared_ptr<CTestFastHash> spTestFastHash(new CTestFastHash); boost::shared_ptr<CTestFastHash> spTestFastHash(new CTestFastHash);
boost::shared_ptr<CTestPolling> spTestPolling(new CTestPolling);
vecTests.push_back(spTestDataStream.get()); vecTests.push_back(spTestDataStream.get());
vecTests.push_back(spTestReader.get()); vecTests.push_back(spTestReader.get());
...@@ -88,6 +91,7 @@ void RunUnitTests() ...@@ -88,6 +91,7 @@ void RunUnitTests()
vecTests.push_back(spTestClientLogic.get()); vecTests.push_back(spTestClientLogic.get());
vecTests.push_back(spTestRecvFromEx.get()); vecTests.push_back(spTestRecvFromEx.get());
vecTests.push_back(spTestFastHash.get()); vecTests.push_back(spTestFastHash.get());
vecTests.push_back(spTestPolling.get());
for (size_t index = 0; index < vecTests.size(); index++) for (size_t index = 0; index < vecTests.size(); index++)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment