WvStreams
wvstream.h
1/* -*- Mode: C++ -*-
2 * Worldvisions Weaver Software:
3 * Copyright (C) 1997-2002 Net Integration Technologies, Inc.
4 *
5 * Provides basic streaming I/O support.
6 */
7#ifndef __WVSTREAM_H
8#define __WVSTREAM_H
9
10#include "iwvstream.h"
11#include "wvtimeutils.h"
12#include "wvstreamsdebugger.h"
13#include <errno.h>
14#include <limits.h>
15#include "wvattrs.h"
16
24class WvStream: public IWvStream
25{
27
28 WvString my_wsname;
29 WSID my_wsid;
30 WvAttrs attrs;
31public:
37
43
46
49
55
57 bool stop_read, stop_write, closed;
58
60 WvStream();
61 virtual ~WvStream();
62
70 virtual void close();
71
73 virtual void seterr(int _errnum);
74 void seterr(WvStringParm specialerr)
75 { WvErrorBase::seterr(specialerr); }
76 void seterr(WVSTRING_FORMAT_DECL)
77 { seterr(WvString(WVSTRING_FORMAT_CALL)); }
78
80 virtual bool isok() const;
81
83 virtual size_t read(void *buf, size_t count);
84
94 virtual size_t read(WvBuf &outbuf, size_t count);
95
101 virtual void unread(WvBuf &outbuf, size_t count);
102
109 virtual size_t write(const void *buf, size_t count);
110
118 virtual size_t write(WvBuf &inbuf, size_t count = INT_MAX);
119
129 void outbuf_limit(size_t size)
130 { max_outbuf_size = size; }
131
132 virtual void noread();
133 virtual void nowrite();
134 virtual void maybe_autoclose();
135
136 virtual bool isreadable();
137 virtual bool iswritable();
138
146 virtual size_t uread(void *buf, size_t count)
147 { return 0; /* basic WvStream doesn't actually do anything! */ }
148
156 virtual size_t uwrite(const void *buf, size_t count)
157 { return count; /* basic WvStream doesn't actually do anything! */ }
158
175 char *getline(time_t wait_msec = 0,
176 char separator = '\n', int readahead = 1024)
177 {
178 return blocking_getline(wait_msec, separator, readahead);
179 }
180
182 char *getline(int wait_msec,
183 char separator = '\n', int readahead = 1024)
184 {
185 return getline(time_t(wait_msec), separator, readahead);
186 }
187
189 char *getline(double wait_msec,
190 char separator = '\n', int readahead = 1024)
191 {
192 return getline(time_t(wait_msec), separator, readahead);
193 }
194
195private:
200 char *getline(char, int i = 0);
201 char *getline(bool, int i = 0);
202public:
203
215 char *blocking_getline(time_t wait_msec, int separator = '\n',
216 int readahead = 1024);
217
222 char *continue_getline(time_t wait_msec, int separator = '\n',
223 int readahead = 1024);
224
232 void queuemin(size_t count)
233 { queue_min = count; }
234
239 void drain();
240
246 void delay_output(bool is_delayed)
247 {
248 outbuf_delayed_flush = is_delayed;
249 want_to_flush = !is_delayed;
250 }
251
258 void auto_flush(bool is_automatic)
259 { is_auto_flush = is_automatic; }
260
267 virtual bool flush(time_t msec_timeout);
268
269 virtual bool should_flush();
270
277 void flush_then_close(int msec_timeout);
278
300 virtual void pre_select(SelectInfo &si);
301
307 {
308 SelectRequest oldwant = si.wants;
309 si.wants = r;
310 pre_select(si);
311 si.wants = oldwant;
312 }
313
319 { pre_select(si, r); }
320
333 virtual bool post_select(SelectInfo &si);
334
340 { return post_select(si, r); }
341
347 {
348 SelectRequest oldwant = si.wants;
349 si.wants = r;
350 bool val = post_select(si);
351 si.wants = oldwant;
352 return val;
353 }
354
376 bool select(time_t msec_timeout)
377 { return _select(msec_timeout, false, false, false, true); }
378
391 void runonce(time_t msec_timeout = -1)
392 { if (select(msec_timeout)) callback(); }
393
415 bool select(time_t msec_timeout,
416 bool readable, bool writable, bool isex = false)
417 { return _select(msec_timeout, readable, writable, isex, false); }
418
425
434 void force_select(bool readable, bool writable, bool isexception = false);
435
440 void undo_force_select(bool readable, bool writable,
441 bool isexception = false);
442
460 bool continue_select(time_t msec_timeout);
461
468
473 virtual const WvAddr *src() const;
474
479 void setcallback(IWvStreamCallback _callfunc);
480
482 IWvStreamCallback setreadcallback(IWvStreamCallback _callback);
483
485 IWvStreamCallback setwritecallback(IWvStreamCallback _callback);
486
489 IWvStreamCallback setexceptcallback(IWvStreamCallback _callback);
490
492 IWvStreamCallback setclosecallback(IWvStreamCallback _callback);
493
499 void autoforward(WvStream &s);
500
502 void noautoforward();
503 static void autoforward_callback(WvStream &input, WvStream &output);
504
508 void *_callwrap(void *);
509
513 void _callback();
514
519 virtual void callback();
520
525 void alarm(time_t msec_timeout);
526
532 time_t alarm_remaining();
533
539 { return write(s.cstr(), s.len()); }
540 size_t print(WvStringParm s)
541 { return write(s); }
542 size_t operator() (WvStringParm s)
543 { return write(s); }
544
546 size_t print(WVSTRING_FORMAT_DECL)
547 { return write(WvString(WVSTRING_FORMAT_CALL)); }
548 size_t operator() (WVSTRING_FORMAT_DECL)
549 { return write(WvString(WVSTRING_FORMAT_CALL)); }
550
551 const char *wsname() const
552 { return my_wsname; }
553 void set_wsname(WvStringParm wsname)
554 { my_wsname = wsname; }
555 void set_wsname(WVSTRING_FORMAT_DECL)
556 { set_wsname(WvString(WVSTRING_FORMAT_CALL)); }
557
558 const char *wstype() const { return "WvStream"; }
559
560 WSID wsid() const { return my_wsid; }
561 static IWvStream *find_by_wsid(WSID wsid);
562
563 virtual WvString getattr(WvStringParm name) const
564 { return attrs.get(name); }
565
566 // ridiculous hackery for now so that the wvstream unit test can poke
567 // around in the insides of WvStream. Eventually, inbuf will go away
568 // from the base WvStream class, so nothing like this will be needed.
569#ifdef __WVSTREAM_UNIT_TEST
570public:
571 size_t outbuf_used()
572 { return outbuf.used(); }
573 size_t inbuf_used()
574 { return inbuf.used(); }
575 void inbuf_putstr(WvStringParm t)
576 { inbuf.putstr(t); }
577#endif
578
579protected:
580 void setattr(WvStringParm name, WvStringParm value)
581 { attrs.set(name, value); }
582 // builds the SelectInfo data structure (runs pre_select)
583 // returns true if there are callbacks to be dispatched
584 //
585 // all of the fields are filled in with new values
586 // si.msec_timeout contains the time until the next alarm expires
587 void _build_selectinfo(SelectInfo &si, time_t msec_timeout,
588 bool readable, bool writable, bool isexcept,
589 bool forceable);
590
591 // runs the actual select() function over the given
592 // SelectInfo data structure, returns the number of descriptors
593 // in the set, and sets the error code if a problem occurs
594 int _do_select(SelectInfo &si);
595
596 // processes the SelectInfo data structure (runs post_select)
597 // returns true if there are callbacks to be dispatched
598 bool _process_selectinfo(SelectInfo &si, bool forceable);
599
600 // tries to empty the output buffer if the stream is writable
601 // not quite the same as flush() since it merely empties the output
602 // buffer asynchronously whereas flush() might have other semantics
603 // also handles autoclose (eg. after flush)
604 bool flush_outbuf(time_t msec_timeout);
605
606 // called once flush() has emptied outbuf to ensure that any other
607 // internal stream buffers actually do get flushed before it returns
608 virtual bool flush_internal(time_t msec_timeout);
609
610 // the real implementations for these are actually in WvFDStream, which
611 // is where they belong. By IWvStream needs them to exist for now, so
612 // it's a hack. In standard WvStream they return -1.
613 virtual int getrfd() const;
614 virtual int getwfd() const;
615
616 // FIXME: this one is so bad, I'm not touching it. Quick hack to
617 // make it work anyway.
618 friend class WvHTTPClientProxyStream;
619
620 WvDynBuf inbuf, outbuf;
621
622 IWvStreamCallback callfunc;
623 wv::function<void*(void*)> call_ctx;
624
625 IWvStreamCallback readcb, writecb, exceptcb, closecb;
626
627 size_t max_outbuf_size;
628 bool outbuf_delayed_flush;
629 bool is_auto_flush;
630
631 // Used to guard against excessive flushing when using delay_flush
632 bool want_to_flush;
633
634 // Used to ensure we don't flush recursively.
635 bool is_flushing;
636
637 size_t queue_min; // minimum bytes to read()
638 time_t autoclose_time; // close eventually, even if output is queued
639 WvTime alarm_time; // select() returns true at this time
640 WvTime last_alarm_check; // last time we checked the alarm_remaining
641
652 virtual void execute()
653 { }
654
655 // every call to select() selects on the globalstream.
656 static WvStream *globalstream;
657
658 static void debugger_streams_display_header(WvStringParm cmd,
659 WvStreamsDebugger::ResultCallback result_cb);
660 static void debugger_streams_display_one_stream(WvStream *s,
661 WvStringParm cmd,
662 WvStreamsDebugger::ResultCallback result_cb);
663 static void debugger_streams_maybe_display_one_stream(WvStream *s,
664 WvStringParm cmd,
665 const WvStringList &args,
666 WvStreamsDebugger::ResultCallback result_cb);
667
668private:
670 bool _select(time_t msec_timeout,
671 bool readable, bool writable, bool isexcept,
672 bool forceable);
673
674 void legacy_callback();
675
677 WvStream(const WvStream &s);
678 WvStream& operator= (const WvStream &s);
679 static void add_debugger_commands();
680
681 static WvString debugger_streams_run_cb(WvStringParm cmd,
682 WvStringList &args,
683 WvStreamsDebugger::ResultCallback result_cb, void *);
684 static WvString debugger_close_run_cb(WvStringParm cmd,
685 WvStringList &args,
686 WvStreamsDebugger::ResultCallback result_cb, void *);
687};
688
695extern WvStream *wvcon; // tied stdin and stdout stream
696extern WvStream *wvin; // stdin stream
697extern WvStream *wvout; // stdout stream
698extern WvStream *wverr; // stderr stream
699
700#endif // __WVSTREAM_H
Base class for different address types, each of which will have the ability to convert itself to/from...
Definition wvaddr.h:119
size_t used() const
Returns the number of elements in the buffer currently available for reading.
Definition wvbufbase.h:92
virtual void seterr(int _errnum)
Set the errnum variable – we have an error.
Definition wverror.cc:144
A WvFastString acts exactly like a WvString, but can take (const char *) strings without needing to a...
Definition wvstring.h:94
const char * cstr() const
return a (const char *) for this string.
Definition wvstring.h:267
Unified support for streams, that is, sequences of bytes that may or may not be ready for read/write ...
Definition wvstream.h:25
char * getline(double wait_msec, char separator='\n', int readahead=1024)
Auto-convert double to time_t.
Definition wvstream.h:189
bool xpost_select(SelectInfo &si, const SelectRequest &r)
Like post_select(), but still exists even if you override the other post_select() in a subclass.
Definition wvstream.h:339
size_t print(WVSTRING_FORMAT_DECL)
preformat and write() a string.
Definition wvstream.h:546
void flush_then_close(int msec_timeout)
flush the output buffer automatically as select() is called.
Definition wvstream.cc:827
virtual void execute()
The callback() function calls execute(), and then calls the user- specified callback if one is define...
Definition wvstream.h:652
IWvStreamCallback setclosecallback(IWvStreamCallback _callback)
Sets a callback to be invoked on close().
Definition wvstream.cc:1174
void queuemin(size_t count)
force read() to not return any bytes unless 'count' bytes can be read at once.
Definition wvstream.h:232
IWvStream::SelectRequest get_select_request()
Use get_select_request() to save the current state of the selection state of this stream.
Definition wvstream.cc:1020
bool post_select(SelectInfo &si, const SelectRequest &r)
A more convenient version of post_select() usable for overriding the 'want' value temporarily.
Definition wvstream.h:346
virtual bool post_select(SelectInfo &si)
post_select() is called after select(), and returns true if this object is now ready.
Definition wvstream.cc:875
virtual bool flush(time_t msec_timeout)
flush the output buffer, if we can do it without delaying more than msec_timeout milliseconds at a ti...
Definition wvstream.cc:707
void delay_output(bool is_delayed)
force write() to always buffer output.
Definition wvstream.h:246
size_t write(WvStringParm s)
print a preformatted WvString to the stream.
Definition wvstream.h:538
char * blocking_getline(time_t wait_msec, int separator='\n', int readahead=1024)
This is a version of getline() that allows you to block for more data to arrive.
Definition wvstream.cc:602
virtual bool isreadable()
Returns true if the stream is readable.
Definition wvstream.cc:590
void auto_flush(bool is_automatic)
if true, force write() to call flush() each time, the default behavour.
Definition wvstream.h:258
virtual bool isok() const
return true if the stream is actually usable right now
Definition wvstream.cc:445
char * continue_getline(time_t wait_msec, int separator='\n', int readahead=1024)
This is a version of blocking_getline() that uses continue_select to avoid blocking other streams.
Definition wvstream.cc:690
void undo_force_select(bool readable, bool writable, bool isexception=false)
Undo a previous force_select() - ie.
Definition wvstream.cc:1038
void autoforward(WvStream &s)
set the callback function for this stream to an internal routine that auto-forwards all incoming stre...
Definition wvstream.cc:362
virtual size_t uread(void *buf, size_t count)
unbuffered I/O functions; these ignore the buffer, which is handled by read().
Definition wvstream.h:146
virtual void nowrite()
Shuts down the writing side of the stream.
Definition wvstream.cc:576
void alarm(time_t msec_timeout)
set an alarm, ie.
Definition wvstream.cc:1049
void setcallback(IWvStreamCallback _callfunc)
define the callback function for this stream, called whenever the callback() member is run,...
Definition wvstream.cc:1129
WvStream * write_requires_readable
If this is set, select() doesn't return true for write unless the given stream also returns true for ...
Definition wvstream.h:42
void force_select(bool readable, bool writable, bool isexception=false)
Use force_select() to force one or more particular modes (readable, writable, or isexception) to true...
Definition wvstream.cc:1027
WvStream()
Basic constructor for just a do-nothing WvStream.
Definition wvstream.cc:249
virtual size_t uwrite(const void *buf, size_t count)
unbuffered I/O functions; these ignore the buffer, which is handled by write().
Definition wvstream.h:156
char * getline(int wait_msec, char separator='\n', int readahead=1024)
Auto-convert int to time_t.
Definition wvstream.h:182
bool uses_continue_select
If this is set, enables the use of continue_select().
Definition wvstream.h:45
bool stop_read
True if noread()/nowrite()/close() have been called, respectively.
Definition wvstream.h:57
virtual size_t write(const void *buf, size_t count)
Write data to the stream.
Definition wvstream.cc:532
void * _callwrap(void *)
A wrapper that's compatible with WvCont, but calls the "real" callback.
Definition wvstream.cc:394
virtual bool should_flush()
Returns true if we want to flush the output buffer right now.
Definition wvstream.cc:724
virtual void pre_select(SelectInfo &si)
pre_select() sets up for eventually calling select().
Definition wvstream.cc:844
bool select(time_t msec_timeout)
Return true if any of the requested features are true on the stream.
Definition wvstream.h:376
void runonce(time_t msec_timeout=-1)
Exactly the same as: if (select(timeout)) callback();.
Definition wvstream.h:391
virtual void maybe_autoclose()
Auto-close the stream if the time is right.
Definition wvstream.cc:583
char * getline(time_t wait_msec=0, char separator='\n', int readahead=1024)
Read up to one line of data from the stream and return a pointer to the internal buffer containing th...
Definition wvstream.h:175
virtual size_t read(void *buf, size_t count)
read a data block on the stream.
Definition wvstream.cc:490
void pre_select(SelectInfo &si, const SelectRequest &r)
A more convenient version of pre_select() usable for overriding the 'want' value temporarily.
Definition wvstream.h:306
virtual const WvAddr * src() const
get the remote address from which the last data block was received.
Definition wvstream.cc:1123
void xpre_select(SelectInfo &si, const SelectRequest &r)
Like pre_select(), but still exists even if you override the other pre_select() in a subclass.
Definition wvstream.h:318
IWvStreamCallback setexceptcallback(IWvStreamCallback _callback)
Sets a callback to be invoked when the stream is in exception state.
Definition wvstream.cc:1164
void terminate_continue_select()
you MUST run this from your destructor if you use continue_select(), or very weird things will happen...
Definition wvstream.cc:1116
void noautoforward()
Stops autoforwarding.
Definition wvstream.cc:369
void _callback()
Actually call the registered callfunc and execute().
Definition wvstream.cc:386
virtual void close()
Close the stream if it is open; isok() becomes false from now on.
Definition wvstream.cc:341
time_t alarm_remaining()
return the number of milliseconds remaining before the alarm will go off; -1 means no alarm is set (i...
Definition wvstream.cc:1058
virtual bool iswritable()
Returns true if the stream is writable (without using the outbuf).
Definition wvstream.cc:596
bool select(time_t msec_timeout, bool readable, bool writable, bool isex=false)
This version of select() sets forceable==false, so we use the exact readable/writable/isexception opt...
Definition wvstream.h:415
IWvStreamCallback setreadcallback(IWvStreamCallback _callback)
Sets a callback to be invoked when the stream is readable.
Definition wvstream.cc:1144
size_t personal_stack_size
Specifies the stack size to reserve for continue_select().
Definition wvstream.h:48
void drain()
drain the input buffer (read and discard data until select(0) returns false)
Definition wvstream.cc:699
bool continue_select(time_t msec_timeout)
return to the caller from execute(), but don't really return exactly; this uses WvCont::yield() to re...
Definition wvstream.cc:1089
virtual void seterr(int _errnum)
Override seterr() from WvError so that it auto-closes the stream.
Definition wvstream.cc:451
IWvStreamCallback setwritecallback(IWvStreamCallback _callback)
Sets a callback to be invoked when the stream is writable.
Definition wvstream.cc:1154
void outbuf_limit(size_t size)
set the maximum size of outbuf, beyond which a call to write() will return 0.
Definition wvstream.h:129
virtual void unread(WvBuf &outbuf, size_t count)
Puts data back into the stream's internal buffer.
Definition wvstream.cc:1190
virtual void noread()
Shuts down the reading side of the stream.
Definition wvstream.cc:569
virtual void callback()
if the stream has a callback function defined, call it now.
Definition wvstream.cc:401
WvStream * read_requires_writable
If this is set, select() doesn't return true for read unless the given stream also returns true for w...
Definition wvstream.h:36
bool alarm_was_ticking
This will be true during callback execution if the callback was triggered by the alarm going off.
Definition wvstream.h:54
This is a WvList of WvStrings, and is a really handy way to parse strings.
WvString is an implementation of a simple and efficient printable-string class.
Definition wvstring.h:330
Based on (and interchangeable with) struct timeval.
Definition wvtimeutils.h:18
the data structure used by pre_select()/post_select() and internally by select().
Definition iwvstream.h:50
A SelectRequest is a convenient way to remember what we want to do to a particular stream: read from ...
Definition iwvstream.h:34
#define IMPLEMENT_IOBJECT(component)
Helper macro to implement the IObject methods automatically.
Definition utils.h:123