KrisLibrary  1.0.0
AsyncIO.h
1 #ifndef UTILS_ASYNC_IO
2 #define UTILS_ASYNC_IO
3 
4 #include <string>
5 #include <vector>
6 #include <list>
7 #include <memory>
8 #include <string.h>
9 #include <KrisLibrary/Logger.h>
10 #include <KrisLibrary/File.h>
11 #include <KrisLibrary/Timer.h>
12 #include <KrisLibrary/errors.h>
13 #include <KrisLibrary/math/math.h>
14 #include "threadutils.h"
15 
27 {
28  public:
30  AsyncReaderQueue(size_t queueMax=1000);
31  virtual ~AsyncReaderQueue() {}
33  void OnRead(const std::string& msg);
34  void OnRead_NoLock(const std::string& msg);
35 
37  virtual void Reset();
39  virtual void Work() {}
40  int MessageCount() { return (int)msgCount; }
41  int UnreadCount();
42  std::string PeekNewest();
43  std::string Newest() { msgQueue.clear(); return PeekNewest(); }
44  std::vector<std::string> New();
45 
46  Mutex mutex;
47  size_t queueMax;
48  size_t msgCount;
49  std::string msgLast;
50  std::list<std::string> msgQueue;
51  size_t numDroppedMsgs;
52 };
53 
63 {
64  public:
68  AsyncWriterQueue(size_t queueMax=1000);
69  virtual ~AsyncWriterQueue() {}
70 
72  bool WriteAvailable() const { return !msgQueue.empty(); }
74  std::string OnWrite();
75  std::string OnWrite_NoLock();
76 
78  virtual void Reset();
80  virtual void Work() {}
81  void Send(const std::string& msg);
82  int SentCount() { return msgCount+msgQueue.size(); }
83  int DeliveredCount() { return (int)msgCount; }
84 
85  Mutex mutex;
86  size_t queueMax;
87  size_t msgCount;
88  std::list<std::string> msgQueue;
89  size_t numDroppedMsgs;
90 };
91 
95 {
96  public:
97  AsyncPipeQueue(size_t recvQueueSize=1000,size_t writeQueueSize=1000);
98  virtual void Reset() { reader.Reset(); writer.Reset(); }
99  virtual void Work() {}
100 
103  void OnRead(const std::string& msg) { return reader.OnRead(msg); }
104  void OnRead_NoLock(const std::string& msg) { return reader.OnRead_NoLock(msg); }
106  bool WriteAvailable() const { return writer.WriteAvailable(); }
108  std::string OnWrite() { return writer.OnWrite(); }
109  std::string OnWrite_NoLock() { return writer.OnWrite_NoLock(); }
110 
112  int MessageCount() { return reader.MessageCount(); }
113  int UnreadCount() { return reader.UnreadCount(); }
114  std::string PeekNewest() { return reader.PeekNewest(); }
115  std::vector<std::string> New() { return reader.New(); }
116  std::string Newest() { return reader.Newest(); }
117 
119  void Send(const std::string& msg) { writer.Send(msg); }
120  int SentCount() { return writer.SentCount(); }
121  int DeliveredCount() { return writer.DeliveredCount(); }
122 
123  AsyncReaderQueue reader;
124  AsyncWriterQueue writer;
125 };
126 
131 {
132  public:
133  TransportBase() {}
134  virtual ~TransportBase() {}
136  virtual bool Start() { return true; }
137  virtual bool Stop() { return true; }
139  virtual bool ReadReady() { return true; }
141  virtual bool WriteReady() { return true; }
145  virtual const std::string* DoRead() { return NULL; }
149  virtual bool DoWrite(const char* msg,int length) { return false; }
150 
155  virtual bool DoWrite(const char* msg) { return DoWrite(msg,strlen(msg)); }
160  virtual bool DoWrite(const std::string& msg) { return DoWrite(msg.c_str(),msg.length()); }
161 };
162 
163 
176 {
177  public:
178  std::istream* in;
179  std::ostream* out;
180  enum Format { IntLengthPrepended, NullTerminated, Ascii, Base64 };
181  Format format;
182  std::string buffer;
183 
184  StreamTransport(std::istream& in);
185  StreamTransport(std::ostream& out);
186  StreamTransport(std::istream& in,std::ostream& out);
187  virtual ~StreamTransport() {}
188  virtual bool ReadReady();
189  virtual bool WriteReady();
190  virtual const std::string* DoRead();
191  virtual bool DoWrite(const char* msg,int length);
192 };
193 
194 
197 {
198  public:
200  SocketClientTransport(const char* addr);
202  SocketClientTransport(const char* addr,SOCKET socket);
203  virtual bool ReadReady();
204  virtual bool WriteReady();
205  virtual bool Start();
206  virtual bool Stop();
208  virtual const std::string* DoRead();
210  virtual bool DoWrite (const char* str,int length);
211 
212  std::string addr;
213  File socket;
214  Mutex mutex;
215  std::string buf;
216 };
217 
221 {
222  public:
223  SocketServerTransport(const char* addr,int maxclients=1);
225  virtual bool Start();
226  virtual bool Stop();
227  virtual bool ReadReady();
228  virtual bool WriteReady();
230  virtual const std::string* DoRead();
232  virtual bool DoWrite (const char* str,int length);
233 
234  std::string addr;
235  int serversocket;
236  int maxclients;
237  Mutex mutex;
238  std::vector<std::unique_ptr<File> > clientsockets;
239  int currentclient;
240  std::string buf;
241 };
242 
255 class SyncPipe : public AsyncPipeQueue
256 {
257  public:
258  SyncPipe();
259  virtual ~SyncPipe();
260  virtual void Reset();
261  virtual void Work();
263  virtual bool Start();
264  virtual void Stop();
265  inline bool Connected() { return initialized && (transport->WriteReady() || transport->ReadReady()); }
266  inline bool WriteReady() { return initialized && transport->WriteReady(); }
267  inline bool ReadReady() { return initialized && transport->ReadReady(); }
268 
269  std::unique_ptr<TransportBase> transport;
270 
271  bool initialized;
272  Timer timer;
273  double lastReadTime,lastWriteTime;
274 };
275 
276 
286 {
287  public:
288  AsyncReaderThread(double timeout=Math::Inf);
289  virtual ~AsyncReaderThread();
290  virtual void Reset();
291  virtual void Work() { LOG4CXX_ERROR(KrisLibrary::logger(),"No need to call Work on AsyncReaderThread"); }
293  virtual bool Start();
294  virtual void Stop();
295 
296  std::unique_ptr<TransportBase> transport;
297 
298  bool initialized;
299  Thread thread;
300  double timeout;
301 
302  Timer timer;
303  double lastReadTime;
304 };
305 
315 {
316  public:
317  AsyncPipeThread(double timeout=Math::Inf);
318  virtual ~AsyncPipeThread();
319  virtual void Reset();
320  virtual void Work() { LOG4CXX_ERROR(KrisLibrary::logger(),"No need to call Work on AsyncReaderThread"); }
322  virtual bool Start();
323  virtual void Stop();
324  inline bool Connected() { return initialized && (transport->WriteReady() || transport->ReadReady()); }
325  inline bool WriteReady() { return initialized && transport->WriteReady(); }
326  inline bool ReadReady() { return initialized && transport->ReadReady(); }
327 
328  std::unique_ptr<TransportBase> transport;
329 
330  bool initialized;
331  Thread workerThread;
332  double timeout;
333 
334  Timer timer;
335  Mutex mutex;
336  double lastReadTime;
337  double lastWriteTime;
338 };
339 
340 
347 {
348  public:
349  SocketReadWorker(const char* addr,bool asServer=false,double timeout=Math::Inf)
350  :AsyncReaderThread(timeout)
351  {
352  if(asServer)
353  transport.reset(new SocketServerTransport(addr));
354  else
355  transport.reset(new SocketClientTransport(addr));
356  }
357 };
358 
367 {
368  public:
369  SocketPipeWorker(const char* addr,bool asServer=false,double timeout=Math::Inf)
370  :AsyncPipeThread(timeout)
371  {
372  if(asServer)
373  transport.reset(new SocketServerTransport(addr));
374  else
375  transport.reset(new SocketClientTransport(addr));
376  }
377 
378  SocketPipeWorker(const char* addr,SOCKET socket,double timeout=Math::Inf)
379  :AsyncPipeThread(timeout)
380  {
381  transport.reset(new SocketClientTransport(addr,socket));
382  }
383 };
384 
385 #endif
virtual void Reset()
Resets the queue and history.
Definition: AsyncIO.cpp:36
Common math typedefs, constants, functions.
AsyncReaderQueue(size_t queueMax=1000)
This will keep only the newest queueMax messages.
Definition: AsyncIO.cpp:12
An asynchronous read/writer that uses multithreading. Subclass will define the transport protocol by ...
Definition: AsyncIO.h:314
bool WriteAvailable() const
Called by subclass to see whether there&#39;s a message to send.
Definition: AsyncIO.h:106
A transport protocol that connects as a client to the given address.
Definition: AsyncIO.h:196
virtual bool DoWrite(const char *msg, int length)
Definition: AsyncIO.h:149
Launches a thread for asynchronous reading from a socket.
Definition: AsyncIO.h:346
virtual const std::string * DoRead()
Definition: AsyncIO.h:145
Asynchronous reader/writer with queues.
Definition: AsyncIO.h:94
int MessageCount()
Receive functions.
Definition: AsyncIO.h:112
An asynchronous reader that uses multithreading. User/subclass will initialize the transport protocol...
Definition: AsyncIO.h:285
Asynchronous reader with queue.
Definition: AsyncIO.h:26
virtual bool DoWrite(const std::string &msg)
Definition: AsyncIO.h:160
virtual void Work()
Do some work to read messages from sender – must be done by subclass.
Definition: AsyncIO.h:39
virtual void Work()
Do some work to read messages from sender – must be done by subclass.
Definition: AsyncIO.h:291
virtual void Work()
Do some work to write messages to receiver – must be done by subclass.
Definition: AsyncIO.h:80
A base class for a transport protocol for unstructured data.
Definition: AsyncIO.h:130
A unified interface for reading/writing binary data to file.
std::string OnWrite()
Called by subclass to get the next message to send to the destination.
Definition: AsyncIO.h:108
virtual bool Start()
Subclasses – can the object read?
Definition: AsyncIO.h:136
void OnRead(const std::string &msg)
Definition: AsyncIO.h:103
void OnRead(const std::string &msg)
Called by subclass to add a message onto the queue.
Definition: AsyncIO.cpp:16
virtual bool DoWrite(const char *msg)
Definition: AsyncIO.h:155
A transport protocol that uses STL streams.
Definition: AsyncIO.h:175
An synchronous reader/writer. User/subclass will initialize the transport protocol (usually blocking ...
Definition: AsyncIO.h:255
The logging system used in KrisLibrary.
Launches a thread for asynchronous bidirectional communication with a socket.
Definition: AsyncIO.h:366
Asynchronous writer with queue.
Definition: AsyncIO.h:62
virtual bool ReadReady()
Subclasses – can the object read?
Definition: AsyncIO.h:139
bool WriteAvailable() const
Called by subclass to see whether there&#39;s a message to send.
Definition: AsyncIO.h:72
Definition: threadutils.h:70
A transport protocol that hosts a client and sends messages to the clients after they connect ...
Definition: AsyncIO.h:220
void Send(const std::string &msg)
Send functions.
Definition: AsyncIO.h:119
Definition: Timer.h:6
A cross-platform class for reading/writing binary data.
Definition: File.h:47
virtual bool WriteReady()
Subclasses – can the object write?
Definition: AsyncIO.h:141