GCC Code Coverage Report
Directory: ./ Exec Total Coverage
File: lib/libevent/evbuffer.c Lines: 91 159 57.2 %
Date: 2017-11-13 Branches: 42 108 38.9 %

Line Branch Exec Source
1
/*	$OpenBSD: evbuffer.c,v 1.17 2014/10/30 16:45:37 bluhm Exp $	*/
2
3
/*
4
 * Copyright (c) 2002-2004 Niels Provos <provos@citi.umich.edu>
5
 * All rights reserved.
6
 *
7
 * Redistribution and use in source and binary forms, with or without
8
 * modification, are permitted provided that the following conditions
9
 * are met:
10
 * 1. Redistributions of source code must retain the above copyright
11
 *    notice, this list of conditions and the following disclaimer.
12
 * 2. Redistributions in binary form must reproduce the above copyright
13
 *    notice, this list of conditions and the following disclaimer in the
14
 *    documentation and/or other materials provided with the distribution.
15
 * 3. The name of the author may not be used to endorse or promote products
16
 *    derived from this software without specific prior written permission.
17
 *
18
 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
19
 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
20
 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
21
 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
22
 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
23
 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
24
 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
25
 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
26
 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
27
 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
28
 */
29
30
#include <sys/types.h>
31
#include <sys/time.h>
32
33
#include <errno.h>
34
#include <stdio.h>
35
#include <stdlib.h>
36
#include <string.h>
37
#include <stdarg.h>
38
39
#include "event.h"
40
41
/* prototypes */
42
43
void bufferevent_read_pressure_cb(struct evbuffer *, size_t, size_t, void *);
44
45
static int
46
bufferevent_add(struct event *ev, int timeout)
47
{
48
235440
	struct timeval tv, *ptv = NULL;
49
50
117720
	if (timeout) {
51
		timerclear(&tv);
52
		tv.tv_sec = timeout;
53
		ptv = &tv;
54
	}
55
56
235440
	return (event_add(ev, ptv));
57
117720
}
58
59
/*
60
 * This callback is executed when the size of the input buffer changes.
61
 * We use it to apply back pressure on the reading side.
62
 */
63
64
void
65
bufferevent_read_pressure_cb(struct evbuffer *buf, size_t old, size_t now,
66
    void *arg) {
67
117036
	struct bufferevent *bufev = arg;
68
	/*
69
	 * If we are below the watermark then reschedule reading if it's
70
	 * still enabled.
71
	 */
72

117036
	if (bufev->wm_read.high == 0 || now < bufev->wm_read.high) {
73
58518
		evbuffer_setcb(buf, NULL, NULL);
74
75
58518
		if (bufev->enabled & EV_READ)
76
58518
			bufferevent_add(&bufev->ev_read, bufev->timeout_read);
77
	}
78
58518
}
79
80
static void
81
bufferevent_readcb(int fd, short event, void *arg)
82
{
83
117108
	struct bufferevent *bufev = arg;
84
	int res = 0;
85
	short what = EVBUFFER_READ;
86
	size_t len;
87
	int howmuch = -1;
88
89
58554
	if (event == EV_TIMEOUT) {
90
		what |= EVBUFFER_TIMEOUT;
91
		goto error;
92
	}
93
94
	/*
95
	 * If we have a high watermark configured then we don't want to
96
	 * read more data than would make us reach the watermark.
97
	 */
98
58554
	if (bufev->wm_read.high != 0) {
99
58500
		howmuch = bufev->wm_read.high - EVBUFFER_LENGTH(bufev->input);
100
		/* we might have lowered the watermark, stop reading */
101
58500
		if (howmuch <= 0) {
102
			struct evbuffer *buf = bufev->input;
103
			event_del(&bufev->ev_read);
104
			evbuffer_setcb(buf,
105
			    bufferevent_read_pressure_cb, bufev);
106
			return;
107
		}
108
	}
109
110
58554
	res = evbuffer_read(bufev->input, fd, howmuch);
111
58554
	if (res == -1) {
112
		if (errno == EAGAIN || errno == EINTR)
113
			goto reschedule;
114
		/* error case */
115
		what |= EVBUFFER_ERROR;
116
58554
	} else if (res == 0) {
117
		/* eof case */
118
		what |= EVBUFFER_EOF;
119
	}
120
121
58554
	if (res <= 0)
122
		goto error;
123
124
58554
	bufferevent_add(&bufev->ev_read, bufev->timeout_read);
125
126
	/* See if this callbacks meets the water marks */
127
58554
	len = EVBUFFER_LENGTH(bufev->input);
128

117054
	if (bufev->wm_read.low != 0 && len < bufev->wm_read.low)
129
		return;
130

117054
	if (bufev->wm_read.high != 0 && len >= bufev->wm_read.high) {
131
58500
		struct evbuffer *buf = bufev->input;
132
58500
		event_del(&bufev->ev_read);
133
134
		/* Now schedule a callback for us when the buffer changes */
135
58500
		evbuffer_setcb(buf, bufferevent_read_pressure_cb, bufev);
136
58500
	}
137
138
	/* Invoke the user callback - must always be called last */
139
58554
	if (bufev->readcb != NULL)
140
58554
		(*bufev->readcb)(bufev, bufev->cbarg);
141
58554
	return;
142
143
 reschedule:
144
	bufferevent_add(&bufev->ev_read, bufev->timeout_read);
145
	return;
146
147
 error:
148
	(*bufev->errorcb)(bufev, what, bufev->cbarg);
149
58554
}
150
151
static void
152
bufferevent_writecb(int fd, short event, void *arg)
153
{
154
1224
	struct bufferevent *bufev = arg;
155
	int res = 0;
156
	short what = EVBUFFER_WRITE;
157
158
612
	if (event == EV_TIMEOUT) {
159
		what |= EVBUFFER_TIMEOUT;
160
		goto error;
161
	}
162
163
612
	if (EVBUFFER_LENGTH(bufev->output)) {
164
612
	    res = evbuffer_write(bufev->output, fd);
165
612
	    if (res == -1) {
166
		    if (errno == EAGAIN ||
167
			errno == EINTR ||
168
			errno == EINPROGRESS)
169
			    goto reschedule;
170
		    /* error case */
171
		    what |= EVBUFFER_ERROR;
172
612
	    } else if (res == 0) {
173
		    /* eof case */
174
		    what |= EVBUFFER_EOF;
175
	    }
176
612
	    if (res <= 0)
177
		    goto error;
178
	}
179
180
612
	if (EVBUFFER_LENGTH(bufev->output) != 0)
181
576
		bufferevent_add(&bufev->ev_write, bufev->timeout_write);
182
183
	/*
184
	 * Invoke the user callback if our buffer is drained or below the
185
	 * low watermark.
186
	 */
187

1224
	if (bufev->writecb != NULL &&
188
612
	    EVBUFFER_LENGTH(bufev->output) <= bufev->wm_write.low)
189
36
		(*bufev->writecb)(bufev, bufev->cbarg);
190
191
612
	return;
192
193
 reschedule:
194
	if (EVBUFFER_LENGTH(bufev->output) != 0)
195
		bufferevent_add(&bufev->ev_write, bufev->timeout_write);
196
	return;
197
198
 error:
199
	(*bufev->errorcb)(bufev, what, bufev->cbarg);
200
612
}
201
202
/*
203
 * Create a new buffered event object.
204
 *
205
 * The read callback is invoked whenever we read new data.
206
 * The write callback is invoked whenever the output buffer is drained.
207
 * The error callback is invoked on a write/read error or on EOF.
208
 *
209
 * Both read and write callbacks maybe NULL.  The error callback is not
210
 * allowed to be NULL and have to be provided always.
211
 */
212
213
struct bufferevent *
214
bufferevent_new(int fd, evbuffercb readcb, evbuffercb writecb,
215
    everrorcb errorcb, void *cbarg)
216
{
217
	struct bufferevent *bufev;
218
219
144
	if ((bufev = calloc(1, sizeof(struct bufferevent))) == NULL)
220
		return (NULL);
221
222
72
	if ((bufev->input = evbuffer_new()) == NULL) {
223
		free(bufev);
224
		return (NULL);
225
	}
226
227
72
	if ((bufev->output = evbuffer_new()) == NULL) {
228
		evbuffer_free(bufev->input);
229
		free(bufev);
230
		return (NULL);
231
	}
232
233
72
	event_set(&bufev->ev_read, fd, EV_READ, bufferevent_readcb, bufev);
234
72
	event_set(&bufev->ev_write, fd, EV_WRITE, bufferevent_writecb, bufev);
235
236
72
	bufferevent_setcb(bufev, readcb, writecb, errorcb, cbarg);
237
238
	/*
239
	 * Set to EV_WRITE so that using bufferevent_write is going to
240
	 * trigger a callback.  Reading needs to be explicitly enabled
241
	 * because otherwise no data will be available.
242
	 */
243
72
	bufev->enabled = EV_WRITE;
244
245
72
	return (bufev);
246
72
}
247
248
void
249
bufferevent_setcb(struct bufferevent *bufev,
250
    evbuffercb readcb, evbuffercb writecb, everrorcb errorcb, void *cbarg)
251
{
252
144
	bufev->readcb = readcb;
253
72
	bufev->writecb = writecb;
254
72
	bufev->errorcb = errorcb;
255
256
72
	bufev->cbarg = cbarg;
257
72
}
258
259
void
260
bufferevent_setfd(struct bufferevent *bufev, int fd)
261
{
262
	event_del(&bufev->ev_read);
263
	event_del(&bufev->ev_write);
264
265
	event_set(&bufev->ev_read, fd, EV_READ, bufferevent_readcb, bufev);
266
	event_set(&bufev->ev_write, fd, EV_WRITE, bufferevent_writecb, bufev);
267
	if (bufev->ev_base != NULL) {
268
		event_base_set(bufev->ev_base, &bufev->ev_read);
269
		event_base_set(bufev->ev_base, &bufev->ev_write);
270
	}
271
272
	/* might have to manually trigger event registration */
273
}
274
275
int
276
bufferevent_priority_set(struct bufferevent *bufev, int priority)
277
{
278
	if (event_priority_set(&bufev->ev_read, priority) == -1)
279
		return (-1);
280
	if (event_priority_set(&bufev->ev_write, priority) == -1)
281
		return (-1);
282
283
	return (0);
284
}
285
286
/* Closing the file descriptor is the responsibility of the caller */
287
288
void
289
bufferevent_free(struct bufferevent *bufev)
290
{
291
144
	event_del(&bufev->ev_read);
292
72
	event_del(&bufev->ev_write);
293
294
72
	evbuffer_free(bufev->input);
295
72
	evbuffer_free(bufev->output);
296
297
72
	free(bufev);
298
72
}
299
300
/*
301
 * Returns 0 on success;
302
 *        -1 on failure.
303
 */
304
305
int
306
bufferevent_write(struct bufferevent *bufev, const void *data, size_t size)
307
{
308
	int res;
309
310
72
	res = evbuffer_add(bufev->output, data, size);
311
312
36
	if (res == -1)
313
		return (res);
314
315
	/* If everything is okay, we need to schedule a write */
316

72
	if (size > 0 && (bufev->enabled & EV_WRITE))
317
36
		bufferevent_add(&bufev->ev_write, bufev->timeout_write);
318
319
36
	return (res);
320
36
}
321
322
int
323
bufferevent_write_buffer(struct bufferevent *bufev, struct evbuffer *buf)
324
{
325
	int res;
326
327
	res = bufferevent_write(bufev, buf->buffer, buf->off);
328
	if (res != -1)
329
		evbuffer_drain(buf, buf->off);
330
331
	return (res);
332
}
333
334
size_t
335
bufferevent_read(struct bufferevent *bufev, void *data, size_t size)
336
{
337
	struct evbuffer *buf = bufev->input;
338
339
	if (buf->off < size)
340
		size = buf->off;
341
342
	/* Copy the available data to the user buffer */
343
	memcpy(data, buf->buffer, size);
344
345
	if (size)
346
		evbuffer_drain(buf, size);
347
348
	return (size);
349
}
350
351
int
352
bufferevent_enable(struct bufferevent *bufev, short event)
353
{
354
72
	if (event & EV_READ) {
355
36
		if (bufferevent_add(&bufev->ev_read, bufev->timeout_read) == -1)
356
			return (-1);
357
	}
358
36
	if (event & EV_WRITE) {
359
		if (bufferevent_add(&bufev->ev_write, bufev->timeout_write) == -1)
360
			return (-1);
361
	}
362
363
36
	bufev->enabled |= event;
364
36
	return (0);
365
36
}
366
367
int
368
bufferevent_disable(struct bufferevent *bufev, short event)
369
{
370
144
	if (event & EV_READ) {
371
72
		if (event_del(&bufev->ev_read) == -1)
372
			return (-1);
373
	}
374
72
	if (event & EV_WRITE) {
375
		if (event_del(&bufev->ev_write) == -1)
376
			return (-1);
377
	}
378
379
72
	bufev->enabled &= ~event;
380
72
	return (0);
381
72
}
382
383
/*
384
 * Sets the read and write timeout for a buffered event.
385
 */
386
387
void
388
bufferevent_settimeout(struct bufferevent *bufev,
389
    int timeout_read, int timeout_write) {
390
	bufev->timeout_read = timeout_read;
391
	bufev->timeout_write = timeout_write;
392
393
	if (event_pending(&bufev->ev_read, EV_READ, NULL))
394
		bufferevent_add(&bufev->ev_read, timeout_read);
395
	if (event_pending(&bufev->ev_write, EV_WRITE, NULL))
396
		bufferevent_add(&bufev->ev_write, timeout_write);
397
}
398
399
/*
400
 * Sets the water marks
401
 */
402
403
void
404
bufferevent_setwatermark(struct bufferevent *bufev, short events,
405
    size_t lowmark, size_t highmark)
406
{
407
36
	if (events & EV_READ) {
408
18
		bufev->wm_read.low = lowmark;
409
18
		bufev->wm_read.high = highmark;
410
18
	}
411
412
18
	if (events & EV_WRITE) {
413
		bufev->wm_write.low = lowmark;
414
		bufev->wm_write.high = highmark;
415
	}
416
417
	/* If the watermarks changed then see if we should call read again */
418
36
	bufferevent_read_pressure_cb(bufev->input,
419
18
	    0, EVBUFFER_LENGTH(bufev->input), bufev);
420
18
}
421
422
int
423
bufferevent_base_set(struct event_base *base, struct bufferevent *bufev)
424
{
425
	int res;
426
427
	bufev->ev_base = base;
428
429
	res = event_base_set(base, &bufev->ev_read);
430
	if (res == -1)
431
		return (res);
432
433
	res = event_base_set(base, &bufev->ev_write);
434
	return (res);
435
}