11 #include <KrisLibrary/Timer.h> 12 #include <KrisLibrary/errors.h> 14 #include "threadutils.h" 33 void OnRead(
const std::string& msg);
34 void OnRead_NoLock(
const std::string& msg);
40 int MessageCount() {
return (
int)msgCount; }
42 std::string PeekNewest();
43 std::string Newest() { msgQueue.clear();
return PeekNewest(); }
44 std::vector<std::string> New();
50 std::list<std::string> msgQueue;
51 size_t numDroppedMsgs;
74 std::string OnWrite();
75 std::string OnWrite_NoLock();
81 void Send(
const std::string& msg);
82 int SentCount() {
return msgCount+msgQueue.size(); }
83 int DeliveredCount() {
return (
int)msgCount; }
88 std::list<std::string> msgQueue;
89 size_t numDroppedMsgs;
97 AsyncPipeQueue(
size_t recvQueueSize=1000,
size_t writeQueueSize=1000);
98 virtual void Reset() { reader.Reset(); writer.Reset(); }
99 virtual void Work() {}
103 void OnRead(
const std::string& msg) {
return reader.OnRead(msg); }
104 void OnRead_NoLock(
const std::string& msg) {
return reader.OnRead_NoLock(msg); }
108 std::string
OnWrite() {
return writer.OnWrite(); }
109 std::string OnWrite_NoLock() {
return writer.OnWrite_NoLock(); }
113 int UnreadCount() {
return reader.UnreadCount(); }
114 std::string PeekNewest() {
return reader.PeekNewest(); }
115 std::vector<std::string> New() {
return reader.New(); }
116 std::string Newest() {
return reader.Newest(); }
119 void Send(
const std::string& msg) { writer.Send(msg); }
120 int SentCount() {
return writer.SentCount(); }
121 int DeliveredCount() {
return writer.DeliveredCount(); }
136 virtual bool Start() {
return true; }
137 virtual bool Stop() {
return true; }
145 virtual const std::string*
DoRead() {
return NULL; }
149 virtual bool DoWrite(
const char* msg,
int length) {
return false; }
160 virtual bool DoWrite(
const std::string& msg) {
return DoWrite(msg.c_str(),msg.length()); }
180 enum Format { IntLengthPrepended, NullTerminated, Ascii, Base64 };
188 virtual bool ReadReady();
189 virtual bool WriteReady();
190 virtual const std::string* DoRead();
191 virtual bool DoWrite(
const char* msg,
int length);
203 virtual bool ReadReady();
204 virtual bool WriteReady();
205 virtual bool Start();
208 virtual const std::string* DoRead();
210 virtual bool DoWrite (
const char* str,
int length);
225 virtual bool Start();
227 virtual bool ReadReady();
228 virtual bool WriteReady();
230 virtual const std::string* DoRead();
232 virtual bool DoWrite (
const char* str,
int length);
238 std::vector<std::unique_ptr<File> > clientsockets;
260 virtual void Reset();
263 virtual bool Start();
265 inline bool Connected() {
return initialized && (transport->WriteReady() || transport->ReadReady()); }
266 inline bool WriteReady() {
return initialized && transport->WriteReady(); }
267 inline bool ReadReady() {
return initialized && transport->ReadReady(); }
269 std::unique_ptr<TransportBase> transport;
273 double lastReadTime,lastWriteTime;
290 virtual void Reset();
291 virtual void Work() { LOG4CXX_ERROR(KrisLibrary::logger(),
"No need to call Work on AsyncReaderThread"); }
293 virtual bool Start();
296 std::unique_ptr<TransportBase> transport;
319 virtual void Reset();
320 virtual void Work() { LOG4CXX_ERROR(KrisLibrary::logger(),
"No need to call Work on AsyncReaderThread"); }
322 virtual bool Start();
324 inline bool Connected() {
return initialized && (transport->WriteReady() || transport->ReadReady()); }
325 inline bool WriteReady() {
return initialized && transport->WriteReady(); }
326 inline bool ReadReady() {
return initialized && transport->ReadReady(); }
328 std::unique_ptr<TransportBase> transport;
337 double lastWriteTime;
349 SocketReadWorker(
const char* addr,
bool asServer=
false,
double timeout=Math::Inf)
369 SocketPipeWorker(
const char* addr,
bool asServer=
false,
double timeout=Math::Inf)
virtual void Reset()
Resets the queue and history.
Definition: AsyncIO.cpp:36
Common math typedefs, constants, functions.
AsyncReaderQueue(size_t queueMax=1000)
This will keep only the newest queueMax messages.
Definition: AsyncIO.cpp:12
An asynchronous read/writer that uses multithreading. Subclass will define the transport protocol by ...
Definition: AsyncIO.h:314
bool WriteAvailable() const
Called by subclass to see whether there's a message to send.
Definition: AsyncIO.h:106
A transport protocol that connects as a client to the given address.
Definition: AsyncIO.h:196
virtual bool DoWrite(const char *msg, int length)
Definition: AsyncIO.h:149
Launches a thread for asynchronous reading from a socket.
Definition: AsyncIO.h:346
virtual const std::string * DoRead()
Definition: AsyncIO.h:145
Asynchronous reader/writer with queues.
Definition: AsyncIO.h:94
int MessageCount()
Receive functions.
Definition: AsyncIO.h:112
An asynchronous reader that uses multithreading. User/subclass will initialize the transport protocol...
Definition: AsyncIO.h:285
Asynchronous reader with queue.
Definition: AsyncIO.h:26
virtual bool DoWrite(const std::string &msg)
Definition: AsyncIO.h:160
virtual void Work()
Do some work to read messages from sender – must be done by subclass.
Definition: AsyncIO.h:39
virtual void Work()
Do some work to read messages from sender – must be done by subclass.
Definition: AsyncIO.h:291
virtual void Work()
Do some work to write messages to receiver – must be done by subclass.
Definition: AsyncIO.h:80
A base class for a transport protocol for unstructured data.
Definition: AsyncIO.h:130
A unified interface for reading/writing binary data to file.
std::string OnWrite()
Called by subclass to get the next message to send to the destination.
Definition: AsyncIO.h:108
virtual bool Start()
Subclasses – can the object read?
Definition: AsyncIO.h:136
void OnRead(const std::string &msg)
Definition: AsyncIO.h:103
void OnRead(const std::string &msg)
Called by subclass to add a message onto the queue.
Definition: AsyncIO.cpp:16
virtual bool DoWrite(const char *msg)
Definition: AsyncIO.h:155
A transport protocol that uses STL streams.
Definition: AsyncIO.h:175
An synchronous reader/writer. User/subclass will initialize the transport protocol (usually blocking ...
Definition: AsyncIO.h:255
The logging system used in KrisLibrary.
Launches a thread for asynchronous bidirectional communication with a socket.
Definition: AsyncIO.h:366
Asynchronous writer with queue.
Definition: AsyncIO.h:62
virtual bool ReadReady()
Subclasses – can the object read?
Definition: AsyncIO.h:139
bool WriteAvailable() const
Called by subclass to see whether there's a message to send.
Definition: AsyncIO.h:72
Definition: threadutils.h:70
A transport protocol that hosts a client and sends messages to the clients after they connect ...
Definition: AsyncIO.h:220
void Send(const std::string &msg)
Send functions.
Definition: AsyncIO.h:119
A cross-platform class for reading/writing binary data.
Definition: File.h:47
virtual bool WriteReady()
Subclasses – can the object write?
Definition: AsyncIO.h:141