GCC Code Coverage Report
Directory: ./ Exec Total Coverage
File: usr.sbin/smtpd/smtpd/../scheduler_ramqueue.c Lines: 0 538 0.0 %
Date: 2017-11-07 Branches: 0 378 0.0 %

Line Branch Exec Source
1
/*	$OpenBSD: scheduler_ramqueue.c,v 1.43 2017/01/09 09:53:23 reyk Exp $	*/
2
3
/*
4
 * Copyright (c) 2012 Gilles Chehade <gilles@poolp.org>
5
 * Copyright (c) 2012 Eric Faurot <eric@openbsd.org>
6
 *
7
 * Permission to use, copy, modify, and distribute this software for any
8
 * purpose with or without fee is hereby granted, provided that the above
9
 * copyright notice and this permission notice appear in all copies.
10
 *
11
 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
12
 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
13
 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
14
 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
15
 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
16
 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
17
 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
18
 */
19
20
#include <sys/types.h>
21
#include <sys/queue.h>
22
#include <sys/tree.h>
23
#include <sys/socket.h>
24
25
#include <ctype.h>
26
#include <err.h>
27
#include <event.h>
28
#include <fcntl.h>
29
#include <imsg.h>
30
#include <inttypes.h>
31
#include <stdio.h>
32
#include <stdlib.h>
33
#include <string.h>
34
#include <limits.h>
35
#include <time.h>
36
37
#include "smtpd.h"
38
#include "log.h"
39
40
TAILQ_HEAD(evplist, rq_envelope);
41
42
struct rq_message {
43
	uint32_t		 msgid;
44
	struct tree		 envelopes;
45
};
46
47
struct rq_envelope {
48
	TAILQ_ENTRY(rq_envelope) entry;
49
	SPLAY_ENTRY(rq_envelope) t_entry;
50
51
	uint64_t		 evpid;
52
	uint64_t		 holdq;
53
	enum delivery_type	 type;
54
55
#define	RQ_EVPSTATE_PENDING	 0
56
#define	RQ_EVPSTATE_SCHEDULED	 1
57
#define	RQ_EVPSTATE_INFLIGHT	 2
58
#define	RQ_EVPSTATE_HELD	 3
59
	uint8_t			 state;
60
61
#define	RQ_ENVELOPE_EXPIRED	 0x01
62
#define	RQ_ENVELOPE_REMOVED	 0x02
63
#define	RQ_ENVELOPE_SUSPEND	 0x04
64
#define	RQ_ENVELOPE_UPDATE	 0x08
65
#define	RQ_ENVELOPE_OVERFLOW	 0x10
66
	uint8_t			 flags;
67
68
	time_t			 ctime;
69
	time_t			 sched;
70
	time_t			 expire;
71
72
	struct rq_message	*message;
73
74
	time_t			 t_inflight;
75
	time_t			 t_scheduled;
76
};
77
78
struct rq_holdq {
79
	struct evplist		 q;
80
	size_t			 count;
81
};
82
83
struct rq_queue {
84
	size_t			 evpcount;
85
	struct tree		 messages;
86
	SPLAY_HEAD(prioqtree, rq_envelope)	q_priotree;
87
88
	struct evplist		 q_pending;
89
	struct evplist		 q_inflight;
90
91
	struct evplist		 q_mta;
92
	struct evplist		 q_mda;
93
	struct evplist		 q_bounce;
94
	struct evplist		 q_update;
95
	struct evplist		 q_expired;
96
	struct evplist		 q_removed;
97
};
98
99
static int rq_envelope_cmp(struct rq_envelope *, struct rq_envelope *);
100
101
SPLAY_PROTOTYPE(prioqtree, rq_envelope, t_entry, rq_envelope_cmp);
102
static int scheduler_ram_init(const char *);
103
static int scheduler_ram_insert(struct scheduler_info *);
104
static size_t scheduler_ram_commit(uint32_t);
105
static size_t scheduler_ram_rollback(uint32_t);
106
static int scheduler_ram_update(struct scheduler_info *);
107
static int scheduler_ram_delete(uint64_t);
108
static int scheduler_ram_hold(uint64_t, uint64_t);
109
static int scheduler_ram_release(int, uint64_t, int);
110
static int scheduler_ram_batch(int, int *, size_t *, uint64_t *, int *);
111
static size_t scheduler_ram_messages(uint32_t, uint32_t *, size_t);
112
static size_t scheduler_ram_envelopes(uint64_t, struct evpstate *, size_t);
113
static int scheduler_ram_schedule(uint64_t);
114
static int scheduler_ram_remove(uint64_t);
115
static int scheduler_ram_suspend(uint64_t);
116
static int scheduler_ram_resume(uint64_t);
117
static int scheduler_ram_query(uint64_t);
118
119
static void sorted_insert(struct rq_queue *, struct rq_envelope *);
120
121
static void rq_queue_init(struct rq_queue *);
122
static void rq_queue_merge(struct rq_queue *, struct rq_queue *);
123
static void rq_queue_dump(struct rq_queue *, const char *);
124
static void rq_queue_schedule(struct rq_queue *rq);
125
static struct evplist *rq_envelope_list(struct rq_queue *, struct rq_envelope *);
126
static void rq_envelope_schedule(struct rq_queue *, struct rq_envelope *);
127
static int rq_envelope_remove(struct rq_queue *, struct rq_envelope *);
128
static int rq_envelope_suspend(struct rq_queue *, struct rq_envelope *);
129
static int rq_envelope_resume(struct rq_queue *, struct rq_envelope *);
130
static void rq_envelope_delete(struct rq_queue *, struct rq_envelope *);
131
static const char *rq_envelope_to_text(struct rq_envelope *);
132
133
struct scheduler_backend scheduler_backend_ramqueue = {
134
	scheduler_ram_init,
135
136
	scheduler_ram_insert,
137
	scheduler_ram_commit,
138
	scheduler_ram_rollback,
139
140
	scheduler_ram_update,
141
	scheduler_ram_delete,
142
	scheduler_ram_hold,
143
	scheduler_ram_release,
144
145
	scheduler_ram_batch,
146
147
	scheduler_ram_messages,
148
	scheduler_ram_envelopes,
149
	scheduler_ram_schedule,
150
	scheduler_ram_remove,
151
	scheduler_ram_suspend,
152
	scheduler_ram_resume,
153
	scheduler_ram_query,
154
};
155
156
static struct rq_queue	ramqueue;
157
static struct tree	updates;
158
static struct tree	holdqs[3]; /* delivery type */
159
160
static time_t		currtime;
161
162
#define BACKOFF_TRANSFER	400
163
#define BACKOFF_DELIVERY	10
164
#define BACKOFF_OVERFLOW	3
165
166
static time_t
167
scheduler_backoff(time_t t0, time_t base, uint32_t step)
168
{
169
	return (t0 + base * step * step);
170
}
171
172
static time_t
173
scheduler_next(time_t t0, time_t base, uint32_t step)
174
{
175
	time_t t;
176
177
	/* XXX be more efficient */
178
	while ((t = scheduler_backoff(t0, base, step)) <= currtime)
179
		step++;
180
181
	return (t);
182
}
183
184
static int
185
scheduler_ram_init(const char *arg)
186
{
187
	rq_queue_init(&ramqueue);
188
	tree_init(&updates);
189
	tree_init(&holdqs[D_MDA]);
190
	tree_init(&holdqs[D_MTA]);
191
	tree_init(&holdqs[D_BOUNCE]);
192
193
	return (1);
194
}
195
196
static int
197
scheduler_ram_insert(struct scheduler_info *si)
198
{
199
	struct rq_queue		*update;
200
	struct rq_message	*message;
201
	struct rq_envelope	*envelope;
202
	uint32_t		 msgid;
203
204
	currtime = time(NULL);
205
206
	msgid = evpid_to_msgid(si->evpid);
207
208
	/* find/prepare a ramqueue update */
209
	if ((update = tree_get(&updates, msgid)) == NULL) {
210
		update = xcalloc(1, sizeof *update, "scheduler_insert");
211
		stat_increment("scheduler.ramqueue.update", 1);
212
		rq_queue_init(update);
213
		tree_xset(&updates, msgid, update);
214
	}
215
216
	/* find/prepare the msgtree message in ramqueue update */
217
	if ((message = tree_get(&update->messages, msgid)) == NULL) {
218
		message = xcalloc(1, sizeof *message, "scheduler_insert");
219
		message->msgid = msgid;
220
		tree_init(&message->envelopes);
221
		tree_xset(&update->messages, msgid, message);
222
		stat_increment("scheduler.ramqueue.message", 1);
223
	}
224
225
	/* create envelope in ramqueue message */
226
	envelope = xcalloc(1, sizeof *envelope, "scheduler_insert");
227
	envelope->evpid = si->evpid;
228
	envelope->type = si->type;
229
	envelope->message = message;
230
	envelope->ctime = si->creation;
231
	envelope->expire = si->creation + si->expire;
232
	envelope->sched = scheduler_backoff(si->creation,
233
	    (si->type == D_MTA) ? BACKOFF_TRANSFER : BACKOFF_DELIVERY, si->retry);
234
	tree_xset(&message->envelopes, envelope->evpid, envelope);
235
236
	update->evpcount++;
237
	stat_increment("scheduler.ramqueue.envelope", 1);
238
239
	envelope->state = RQ_EVPSTATE_PENDING;
240
	TAILQ_INSERT_TAIL(&update->q_pending, envelope, entry);
241
242
	si->nexttry = envelope->sched;
243
244
	return (1);
245
}
246
247
static size_t
248
scheduler_ram_commit(uint32_t msgid)
249
{
250
	struct rq_queue	*update;
251
	size_t		 r;
252
253
	currtime = time(NULL);
254
255
	update = tree_xpop(&updates, msgid);
256
	r = update->evpcount;
257
258
	if (tracing & TRACE_SCHEDULER)
259
		rq_queue_dump(update, "update to commit");
260
261
	rq_queue_merge(&ramqueue, update);
262
263
	if (tracing & TRACE_SCHEDULER)
264
		rq_queue_dump(&ramqueue, "resulting queue");
265
266
	rq_queue_schedule(&ramqueue);
267
268
	free(update);
269
	stat_decrement("scheduler.ramqueue.update", 1);
270
271
	return (r);
272
}
273
274
static size_t
275
scheduler_ram_rollback(uint32_t msgid)
276
{
277
	struct rq_queue		*update;
278
	struct rq_envelope	*evp;
279
	size_t			 r;
280
281
	currtime = time(NULL);
282
283
	if ((update = tree_pop(&updates, msgid)) == NULL)
284
		return (0);
285
	r = update->evpcount;
286
287
	while ((evp = TAILQ_FIRST(&update->q_pending))) {
288
		TAILQ_REMOVE(&update->q_pending, evp, entry);
289
		rq_envelope_delete(update, evp);
290
	}
291
292
	free(update);
293
	stat_decrement("scheduler.ramqueue.update", 1);
294
295
	return (r);
296
}
297
298
static int
299
scheduler_ram_update(struct scheduler_info *si)
300
{
301
	struct rq_message	*msg;
302
	struct rq_envelope	*evp;
303
	uint32_t		 msgid;
304
305
	currtime = time(NULL);
306
307
	msgid = evpid_to_msgid(si->evpid);
308
	msg = tree_xget(&ramqueue.messages, msgid);
309
	evp = tree_xget(&msg->envelopes, si->evpid);
310
311
	/* it *must* be in-flight */
312
	if (evp->state != RQ_EVPSTATE_INFLIGHT)
313
		errx(1, "evp:%016" PRIx64 " not in-flight", si->evpid);
314
315
	TAILQ_REMOVE(&ramqueue.q_inflight, evp, entry);
316
317
	/*
318
	 * If the envelope was removed while inflight,  schedule it for
319
	 * removal immediately.
320
	 */
321
	if (evp->flags & RQ_ENVELOPE_REMOVED) {
322
		TAILQ_INSERT_TAIL(&ramqueue.q_removed, evp, entry);
323
		evp->state = RQ_EVPSTATE_SCHEDULED;
324
		evp->t_scheduled = currtime;
325
		return (1);
326
	}
327
328
	evp->sched = scheduler_next(evp->ctime,
329
	    (si->type == D_MTA) ? BACKOFF_TRANSFER : BACKOFF_DELIVERY, si->retry);
330
331
	evp->state = RQ_EVPSTATE_PENDING;
332
	if (!(evp->flags & RQ_ENVELOPE_SUSPEND))
333
		sorted_insert(&ramqueue, evp);
334
335
	si->nexttry = evp->sched;
336
337
	return (1);
338
}
339
340
static int
341
scheduler_ram_delete(uint64_t evpid)
342
{
343
	struct rq_message	*msg;
344
	struct rq_envelope	*evp;
345
	uint32_t		 msgid;
346
347
	currtime = time(NULL);
348
349
	msgid = evpid_to_msgid(evpid);
350
	msg = tree_xget(&ramqueue.messages, msgid);
351
	evp = tree_xget(&msg->envelopes, evpid);
352
353
	/* it *must* be in-flight */
354
	if (evp->state != RQ_EVPSTATE_INFLIGHT)
355
		errx(1, "evp:%016" PRIx64 " not in-flight", evpid);
356
357
	TAILQ_REMOVE(&ramqueue.q_inflight, evp, entry);
358
359
	rq_envelope_delete(&ramqueue, evp);
360
361
	return (1);
362
}
363
364
#define HOLDQ_MAXSIZE	1000
365
366
static int
367
scheduler_ram_hold(uint64_t evpid, uint64_t holdq)
368
{
369
	struct rq_holdq		*hq;
370
	struct rq_message	*msg;
371
	struct rq_envelope	*evp;
372
	uint32_t		 msgid;
373
374
	currtime = time(NULL);
375
376
	msgid = evpid_to_msgid(evpid);
377
	msg = tree_xget(&ramqueue.messages, msgid);
378
	evp = tree_xget(&msg->envelopes, evpid);
379
380
	/* it *must* be in-flight */
381
	if (evp->state != RQ_EVPSTATE_INFLIGHT)
382
		errx(1, "evp:%016" PRIx64 " not in-flight", evpid);
383
384
	TAILQ_REMOVE(&ramqueue.q_inflight, evp, entry);
385
386
	/* If the envelope is suspended, just mark it as pending */
387
	if (evp->flags & RQ_ENVELOPE_SUSPEND) {
388
		evp->state = RQ_EVPSTATE_PENDING;
389
		return (0);
390
	}
391
392
	hq = tree_get(&holdqs[evp->type], holdq);
393
	if (hq == NULL) {
394
		hq = xcalloc(1, sizeof(*hq), "scheduler_hold");
395
		TAILQ_INIT(&hq->q);
396
		tree_xset(&holdqs[evp->type], holdq, hq);
397
		stat_increment("scheduler.ramqueue.holdq", 1);
398
	}
399
400
	/* If the holdq is full, just "tempfail" the envelope */
401
	if (hq->count >= HOLDQ_MAXSIZE) {
402
		evp->state = RQ_EVPSTATE_PENDING;
403
		evp->flags |= RQ_ENVELOPE_UPDATE;
404
		evp->flags |= RQ_ENVELOPE_OVERFLOW;
405
		sorted_insert(&ramqueue, evp);
406
		stat_increment("scheduler.ramqueue.hold-overflow", 1);
407
		return (0);
408
	}
409
410
	evp->state = RQ_EVPSTATE_HELD;
411
	evp->holdq = holdq;
412
	/* This is an optimization: upon release, the envelopes will be
413
	 * inserted in the pending queue from the first element to the last.
414
	 * Since elements already in the queue were received first, they
415
	 * were scheduled first, so they will be reinserted before the
416
	 * current element.
417
	 */
418
	TAILQ_INSERT_HEAD(&hq->q, evp, entry);
419
	hq->count += 1;
420
	stat_increment("scheduler.ramqueue.hold", 1);
421
422
	return (1);
423
}
424
425
static int
426
scheduler_ram_release(int type, uint64_t holdq, int n)
427
{
428
	struct rq_holdq		*hq;
429
	struct rq_envelope	*evp;
430
	int			 i, update;
431
432
	currtime = time(NULL);
433
434
	hq = tree_get(&holdqs[type], holdq);
435
	if (hq == NULL)
436
		return (0);
437
438
	if (n == -1) {
439
		n = 0;
440
		update = 1;
441
	}
442
	else
443
		update = 0;
444
445
	for (i = 0; n == 0 || i < n; i++) {
446
		evp = TAILQ_FIRST(&hq->q);
447
		if (evp == NULL)
448
			break;
449
450
		TAILQ_REMOVE(&hq->q, evp, entry);
451
		hq->count -= 1;
452
		evp->holdq = 0;
453
454
		/* When released, all envelopes are put in the pending queue
455
		 * and will be rescheduled immediately.  As an optimization,
456
		 * we could just schedule them directly.
457
		 */
458
		evp->state = RQ_EVPSTATE_PENDING;
459
		if (update)
460
			evp->flags |= RQ_ENVELOPE_UPDATE;
461
		sorted_insert(&ramqueue, evp);
462
	}
463
464
	if (TAILQ_EMPTY(&hq->q)) {
465
		tree_xpop(&holdqs[type], holdq);
466
		free(hq);
467
		stat_decrement("scheduler.ramqueue.holdq", 1);
468
	}
469
	stat_decrement("scheduler.ramqueue.hold", i);
470
471
	return (i);
472
}
473
474
static int
475
scheduler_ram_batch(int mask, int *delay, size_t *count, uint64_t *evpids, int *types)
476
{
477
	struct rq_envelope	*evp;
478
	size_t			 i, n;
479
	time_t			 t;
480
481
	currtime = time(NULL);
482
483
	rq_queue_schedule(&ramqueue);
484
	if (tracing & TRACE_SCHEDULER)
485
		rq_queue_dump(&ramqueue, "scheduler_ram_batch()");
486
487
	i = 0;
488
	n = 0;
489
490
	for (;;) {
491
492
		if (mask & SCHED_REMOVE && (evp = TAILQ_FIRST(&ramqueue.q_removed))) {
493
			TAILQ_REMOVE(&ramqueue.q_removed, evp, entry);
494
			types[i] = SCHED_REMOVE;
495
			evpids[i] = evp->evpid;
496
			rq_envelope_delete(&ramqueue, evp);
497
498
			if (++i == *count)
499
				break;
500
		}
501
502
		if (mask & SCHED_EXPIRE && (evp = TAILQ_FIRST(&ramqueue.q_expired))) {
503
			TAILQ_REMOVE(&ramqueue.q_expired, evp, entry);
504
			types[i] = SCHED_EXPIRE;
505
			evpids[i] = evp->evpid;
506
			rq_envelope_delete(&ramqueue, evp);
507
508
			if (++i == *count)
509
				break;
510
		}
511
512
		if (mask & SCHED_UPDATE && (evp = TAILQ_FIRST(&ramqueue.q_update))) {
513
			TAILQ_REMOVE(&ramqueue.q_update, evp, entry);
514
			types[i] = SCHED_UPDATE;
515
			evpids[i] = evp->evpid;
516
517
			if (evp->flags & RQ_ENVELOPE_OVERFLOW)
518
				t = BACKOFF_OVERFLOW;
519
			else if (evp->type == D_MTA)
520
				t = BACKOFF_TRANSFER;
521
			else
522
				t = BACKOFF_DELIVERY;
523
524
			evp->sched = scheduler_next(evp->ctime, t, 0);
525
			evp->flags &= ~(RQ_ENVELOPE_UPDATE|RQ_ENVELOPE_OVERFLOW);
526
			evp->state = RQ_EVPSTATE_PENDING;
527
			if (!(evp->flags & RQ_ENVELOPE_SUSPEND))
528
				sorted_insert(&ramqueue, evp);
529
530
			if (++i == *count)
531
				break;
532
		}
533
534
		if (mask & SCHED_BOUNCE && (evp = TAILQ_FIRST(&ramqueue.q_bounce))) {
535
			TAILQ_REMOVE(&ramqueue.q_bounce, evp, entry);
536
			types[i] = SCHED_BOUNCE;
537
			evpids[i] = evp->evpid;
538
539
			TAILQ_INSERT_TAIL(&ramqueue.q_inflight, evp, entry);
540
			evp->state = RQ_EVPSTATE_INFLIGHT;
541
			evp->t_inflight = currtime;
542
543
			if (++i == *count)
544
				break;
545
		}
546
547
		if (mask & SCHED_MDA && (evp = TAILQ_FIRST(&ramqueue.q_mda))) {
548
			TAILQ_REMOVE(&ramqueue.q_mda, evp, entry);
549
			types[i] = SCHED_MDA;
550
			evpids[i] = evp->evpid;
551
552
			TAILQ_INSERT_TAIL(&ramqueue.q_inflight, evp, entry);
553
			evp->state = RQ_EVPSTATE_INFLIGHT;
554
			evp->t_inflight = currtime;
555
556
			if (++i == *count)
557
				break;
558
		}
559
560
		if (mask & SCHED_MTA && (evp = TAILQ_FIRST(&ramqueue.q_mta))) {
561
			TAILQ_REMOVE(&ramqueue.q_mta, evp, entry);
562
			types[i] = SCHED_MTA;
563
			evpids[i] = evp->evpid;
564
565
			TAILQ_INSERT_TAIL(&ramqueue.q_inflight, evp, entry);
566
			evp->state = RQ_EVPSTATE_INFLIGHT;
567
			evp->t_inflight = currtime;
568
569
			if (++i == *count)
570
				break;
571
		}
572
573
		/* nothing seen this round */
574
		if (i == n)
575
			break;
576
577
		n = i;
578
	}
579
580
	if (i) {
581
		*count = i;
582
		return (1);
583
	}
584
585
	if ((evp = TAILQ_FIRST(&ramqueue.q_pending))) {
586
		if (evp->sched < evp->expire)
587
			t = evp->sched;
588
		else
589
			t = evp->expire;
590
		*delay = (t < currtime) ? 0 : (t - currtime);
591
	}
592
	else
593
		*delay = -1;
594
595
	return (0);
596
}
597
598
static size_t
599
scheduler_ram_messages(uint32_t from, uint32_t *dst, size_t size)
600
{
601
	uint64_t id;
602
	size_t	 n;
603
	void	*i;
604
605
	for (n = 0, i = NULL; n < size; n++) {
606
		if (tree_iterfrom(&ramqueue.messages, &i, from, &id, NULL) == 0)
607
			break;
608
		dst[n] = id;
609
	}
610
611
	return (n);
612
}
613
614
static size_t
615
scheduler_ram_envelopes(uint64_t from, struct evpstate *dst, size_t size)
616
{
617
	struct rq_message	*msg;
618
	struct rq_envelope	*evp;
619
	void			*i;
620
	size_t			 n;
621
622
	if ((msg = tree_get(&ramqueue.messages, evpid_to_msgid(from))) == NULL)
623
		return (0);
624
625
	for (n = 0, i = NULL; n < size; ) {
626
627
		if (tree_iterfrom(&msg->envelopes, &i, from, NULL,
628
		    (void**)&evp) == 0)
629
			break;
630
631
		if (evp->flags & (RQ_ENVELOPE_REMOVED | RQ_ENVELOPE_EXPIRED))
632
			continue;
633
634
		dst[n].evpid = evp->evpid;
635
		dst[n].flags = 0;
636
		dst[n].retry = 0;
637
		dst[n].time = 0;
638
639
		if (evp->state == RQ_EVPSTATE_PENDING) {
640
			dst[n].time = evp->sched;
641
			dst[n].flags = EF_PENDING;
642
		}
643
		else if (evp->state == RQ_EVPSTATE_SCHEDULED) {
644
			dst[n].time = evp->t_scheduled;
645
			dst[n].flags = EF_PENDING;
646
		}
647
		else if (evp->state == RQ_EVPSTATE_INFLIGHT) {
648
			dst[n].time = evp->t_inflight;
649
			dst[n].flags = EF_INFLIGHT;
650
		}
651
		else if (evp->state == RQ_EVPSTATE_HELD) {
652
			/* same as scheduled */
653
			dst[n].time = evp->t_scheduled;
654
			dst[n].flags = EF_PENDING;
655
			dst[n].flags |= EF_HOLD;
656
		}
657
		if (evp->flags & RQ_ENVELOPE_SUSPEND)
658
			dst[n].flags |= EF_SUSPEND;
659
660
		n++;
661
	}
662
663
	return (n);
664
}
665
666
static int
667
scheduler_ram_schedule(uint64_t evpid)
668
{
669
	struct rq_message	*msg;
670
	struct rq_envelope	*evp;
671
	uint32_t		 msgid;
672
	void			*i;
673
	int			 r;
674
675
	currtime = time(NULL);
676
677
	if (evpid > 0xffffffff) {
678
		msgid = evpid_to_msgid(evpid);
679
		if ((msg = tree_get(&ramqueue.messages, msgid)) == NULL)
680
			return (0);
681
		if ((evp = tree_get(&msg->envelopes, evpid)) == NULL)
682
			return (0);
683
		if (evp->state == RQ_EVPSTATE_INFLIGHT)
684
			return (0);
685
		rq_envelope_schedule(&ramqueue, evp);
686
		return (1);
687
	}
688
	else {
689
		msgid = evpid;
690
		if ((msg = tree_get(&ramqueue.messages, msgid)) == NULL)
691
			return (0);
692
		i = NULL;
693
		r = 0;
694
		while (tree_iter(&msg->envelopes, &i, NULL, (void*)(&evp))) {
695
			if (evp->state == RQ_EVPSTATE_INFLIGHT)
696
				continue;
697
			rq_envelope_schedule(&ramqueue, evp);
698
			r++;
699
		}
700
		return (r);
701
	}
702
}
703
704
static int
705
scheduler_ram_remove(uint64_t evpid)
706
{
707
	struct rq_message	*msg;
708
	struct rq_envelope	*evp;
709
	uint32_t		 msgid;
710
	void			*i;
711
	int			 r;
712
713
	currtime = time(NULL);
714
715
	if (evpid > 0xffffffff) {
716
		msgid = evpid_to_msgid(evpid);
717
		if ((msg = tree_get(&ramqueue.messages, msgid)) == NULL)
718
			return (0);
719
		if ((evp = tree_get(&msg->envelopes, evpid)) == NULL)
720
			return (0);
721
		if (rq_envelope_remove(&ramqueue, evp))
722
			return (1);
723
		return (0);
724
	}
725
	else {
726
		msgid = evpid;
727
		if ((msg = tree_get(&ramqueue.messages, msgid)) == NULL)
728
			return (0);
729
		i = NULL;
730
		r = 0;
731
		while (tree_iter(&msg->envelopes, &i, NULL, (void*)(&evp)))
732
			if (rq_envelope_remove(&ramqueue, evp))
733
				r++;
734
		return (r);
735
	}
736
}
737
738
static int
739
scheduler_ram_suspend(uint64_t evpid)
740
{
741
	struct rq_message	*msg;
742
	struct rq_envelope	*evp;
743
	uint32_t		 msgid;
744
	void			*i;
745
	int			 r;
746
747
	currtime = time(NULL);
748
749
	if (evpid > 0xffffffff) {
750
		msgid = evpid_to_msgid(evpid);
751
		if ((msg = tree_get(&ramqueue.messages, msgid)) == NULL)
752
			return (0);
753
		if ((evp = tree_get(&msg->envelopes, evpid)) == NULL)
754
			return (0);
755
		if (rq_envelope_suspend(&ramqueue, evp))
756
			return (1);
757
		return (0);
758
	}
759
	else {
760
		msgid = evpid;
761
		if ((msg = tree_get(&ramqueue.messages, msgid)) == NULL)
762
			return (0);
763
		i = NULL;
764
		r = 0;
765
		while (tree_iter(&msg->envelopes, &i, NULL, (void*)(&evp)))
766
			if (rq_envelope_suspend(&ramqueue, evp))
767
				r++;
768
		return (r);
769
	}
770
}
771
772
static int
773
scheduler_ram_resume(uint64_t evpid)
774
{
775
	struct rq_message	*msg;
776
	struct rq_envelope	*evp;
777
	uint32_t		 msgid;
778
	void			*i;
779
	int			 r;
780
781
	currtime = time(NULL);
782
783
	if (evpid > 0xffffffff) {
784
		msgid = evpid_to_msgid(evpid);
785
		if ((msg = tree_get(&ramqueue.messages, msgid)) == NULL)
786
			return (0);
787
		if ((evp = tree_get(&msg->envelopes, evpid)) == NULL)
788
			return (0);
789
		if (rq_envelope_resume(&ramqueue, evp))
790
			return (1);
791
		return (0);
792
	}
793
	else {
794
		msgid = evpid;
795
		if ((msg = tree_get(&ramqueue.messages, msgid)) == NULL)
796
			return (0);
797
		i = NULL;
798
		r = 0;
799
		while (tree_iter(&msg->envelopes, &i, NULL, (void*)(&evp)))
800
			if (rq_envelope_resume(&ramqueue, evp))
801
				r++;
802
		return (r);
803
	}
804
}
805
806
static int
807
scheduler_ram_query(uint64_t evpid)
808
{
809
	uint32_t msgid;
810
811
	if (evpid > 0xffffffff)
812
		msgid = evpid_to_msgid(evpid);
813
	else
814
		msgid = evpid;
815
816
	if (tree_get(&ramqueue.messages, msgid) == NULL)
817
		return (0);
818
819
	return (1);
820
}
821
822
static void
823
sorted_insert(struct rq_queue *rq, struct rq_envelope *evp)
824
{
825
	struct rq_envelope	*evp2;
826
827
	SPLAY_INSERT(prioqtree, &rq->q_priotree, evp);
828
	evp2 = SPLAY_NEXT(prioqtree, &rq->q_priotree, evp);
829
	if (evp2)
830
		TAILQ_INSERT_BEFORE(evp2, evp, entry);
831
	else
832
		TAILQ_INSERT_TAIL(&rq->q_pending, evp, entry);
833
}
834
835
static void
836
rq_queue_init(struct rq_queue *rq)
837
{
838
	memset(rq, 0, sizeof *rq);
839
	tree_init(&rq->messages);
840
	TAILQ_INIT(&rq->q_pending);
841
	TAILQ_INIT(&rq->q_inflight);
842
	TAILQ_INIT(&rq->q_mta);
843
	TAILQ_INIT(&rq->q_mda);
844
	TAILQ_INIT(&rq->q_bounce);
845
	TAILQ_INIT(&rq->q_update);
846
	TAILQ_INIT(&rq->q_expired);
847
	TAILQ_INIT(&rq->q_removed);
848
	SPLAY_INIT(&rq->q_priotree);
849
}
850
851
static void
852
rq_queue_merge(struct rq_queue *rq, struct rq_queue *update)
853
{
854
	struct rq_message	*message, *tomessage;
855
	struct rq_envelope	*envelope;
856
	uint64_t		 id;
857
	void			*i;
858
859
	while (tree_poproot(&update->messages, &id, (void*)&message)) {
860
		if ((tomessage = tree_get(&rq->messages, id)) == NULL) {
861
			/* message does not exist. re-use structure */
862
			tree_xset(&rq->messages, id, message);
863
			continue;
864
		}
865
		/* need to re-link all envelopes before merging them */
866
		i = NULL;
867
		while ((tree_iter(&message->envelopes, &i, &id,
868
		    (void*)&envelope)))
869
			envelope->message = tomessage;
870
		tree_merge(&tomessage->envelopes, &message->envelopes);
871
		free(message);
872
		stat_decrement("scheduler.ramqueue.message", 1);
873
	}
874
875
	/* Sorted insert in the pending queue */
876
	while ((envelope = TAILQ_FIRST(&update->q_pending))) {
877
		TAILQ_REMOVE(&update->q_pending, envelope, entry);
878
		sorted_insert(rq, envelope);
879
	}
880
881
	rq->evpcount += update->evpcount;
882
}
883
884
#define SCHEDULEMAX	1024
885
886
static void
887
rq_queue_schedule(struct rq_queue *rq)
888
{
889
	struct rq_envelope	*evp;
890
	size_t			 n;
891
892
	n = 0;
893
	while ((evp = TAILQ_FIRST(&rq->q_pending))) {
894
		if (evp->sched > currtime && evp->expire > currtime)
895
			break;
896
897
		if (n == SCHEDULEMAX)
898
			break;
899
900
		if (evp->state != RQ_EVPSTATE_PENDING)
901
			errx(1, "evp:%016" PRIx64 " flags=0x%x", evp->evpid,
902
			    evp->flags);
903
904
		if (evp->expire <= currtime) {
905
			TAILQ_REMOVE(&rq->q_pending, evp, entry);
906
			SPLAY_REMOVE(prioqtree, &rq->q_priotree, evp);
907
			TAILQ_INSERT_TAIL(&rq->q_expired, evp, entry);
908
			evp->state = RQ_EVPSTATE_SCHEDULED;
909
			evp->flags |= RQ_ENVELOPE_EXPIRED;
910
			evp->t_scheduled = currtime;
911
			continue;
912
		}
913
		rq_envelope_schedule(rq, evp);
914
		n += 1;
915
	}
916
}
917
918
static struct evplist *
919
rq_envelope_list(struct rq_queue *rq, struct rq_envelope *evp)
920
{
921
	switch (evp->state) {
922
	case RQ_EVPSTATE_PENDING:
923
		return &rq->q_pending;
924
925
	case RQ_EVPSTATE_SCHEDULED:
926
		if (evp->flags & RQ_ENVELOPE_EXPIRED)
927
			return &rq->q_expired;
928
		if (evp->flags & RQ_ENVELOPE_REMOVED)
929
			return &rq->q_removed;
930
		if (evp->flags & RQ_ENVELOPE_UPDATE)
931
			return &rq->q_update;
932
		if (evp->type == D_MTA)
933
			return &rq->q_mta;
934
		if (evp->type == D_MDA)
935
			return &rq->q_mda;
936
		if (evp->type == D_BOUNCE)
937
			return &rq->q_bounce;
938
		errx(1, "%016" PRIx64 " bad evp type %d", evp->evpid, evp->type);
939
940
	case RQ_EVPSTATE_INFLIGHT:
941
		return &rq->q_inflight;
942
943
	case RQ_EVPSTATE_HELD:
944
		return (NULL);
945
	}
946
947
	errx(1, "%016" PRIx64 " bad state %d", evp->evpid, evp->state);
948
	return (NULL);
949
}
950
951
static void
952
rq_envelope_schedule(struct rq_queue *rq, struct rq_envelope *evp)
953
{
954
	struct rq_holdq	*hq;
955
	struct evplist	*q = NULL;
956
957
	switch (evp->type) {
958
	case D_MTA:
959
		q = &rq->q_mta;
960
		break;
961
	case D_MDA:
962
		q = &rq->q_mda;
963
		break;
964
	case D_BOUNCE:
965
		q = &rq->q_bounce;
966
		break;
967
	}
968
969
	if (evp->flags & RQ_ENVELOPE_UPDATE)
970
		q = &rq->q_update;
971
972
	if (evp->state == RQ_EVPSTATE_HELD) {
973
		hq = tree_xget(&holdqs[evp->type], evp->holdq);
974
		TAILQ_REMOVE(&hq->q, evp, entry);
975
		hq->count -= 1;
976
		if (TAILQ_EMPTY(&hq->q)) {
977
			tree_xpop(&holdqs[evp->type], evp->holdq);
978
			free(hq);
979
		}
980
		evp->holdq = 0;
981
		stat_decrement("scheduler.ramqueue.hold", 1);
982
	}
983
	else if (!(evp->flags & RQ_ENVELOPE_SUSPEND)) {
984
		TAILQ_REMOVE(&rq->q_pending, evp, entry);
985
		SPLAY_REMOVE(prioqtree, &rq->q_priotree, evp);
986
	}
987
988
	TAILQ_INSERT_TAIL(q, evp, entry);
989
	evp->state = RQ_EVPSTATE_SCHEDULED;
990
	evp->t_scheduled = currtime;
991
}
992
993
static int
994
rq_envelope_remove(struct rq_queue *rq, struct rq_envelope *evp)
995
{
996
	struct rq_holdq	*hq;
997
	struct evplist	*evl;
998
999
	if (evp->flags & (RQ_ENVELOPE_REMOVED | RQ_ENVELOPE_EXPIRED))
1000
		return (0);
1001
	/*
1002
	 * If envelope is inflight, mark it envelope for removal.
1003
	 */
1004
	if (evp->state == RQ_EVPSTATE_INFLIGHT) {
1005
		evp->flags |= RQ_ENVELOPE_REMOVED;
1006
		return (1);
1007
	}
1008
1009
	if (evp->state == RQ_EVPSTATE_HELD) {
1010
		hq = tree_xget(&holdqs[evp->type], evp->holdq);
1011
		TAILQ_REMOVE(&hq->q, evp, entry);
1012
		hq->count -= 1;
1013
		if (TAILQ_EMPTY(&hq->q)) {
1014
			tree_xpop(&holdqs[evp->type], evp->holdq);
1015
			free(hq);
1016
		}
1017
		evp->holdq = 0;
1018
		stat_decrement("scheduler.ramqueue.hold", 1);
1019
	}
1020
	else if (!(evp->flags & RQ_ENVELOPE_SUSPEND)) {
1021
		evl = rq_envelope_list(rq, evp);
1022
		TAILQ_REMOVE(evl, evp, entry);
1023
		if (evl == &rq->q_pending)
1024
			SPLAY_REMOVE(prioqtree, &rq->q_priotree, evp);
1025
	}
1026
1027
	TAILQ_INSERT_TAIL(&rq->q_removed, evp, entry);
1028
	evp->state = RQ_EVPSTATE_SCHEDULED;
1029
	evp->flags |= RQ_ENVELOPE_REMOVED;
1030
	evp->t_scheduled = currtime;
1031
1032
	return (1);
1033
}
1034
1035
static int
1036
rq_envelope_suspend(struct rq_queue *rq, struct rq_envelope *evp)
1037
{
1038
	struct rq_holdq	*hq;
1039
	struct evplist	*evl;
1040
1041
	if (evp->flags & RQ_ENVELOPE_SUSPEND)
1042
		return (0);
1043
1044
	if (evp->state == RQ_EVPSTATE_HELD) {
1045
		hq = tree_xget(&holdqs[evp->type], evp->holdq);
1046
		TAILQ_REMOVE(&hq->q, evp, entry);
1047
		hq->count -= 1;
1048
		if (TAILQ_EMPTY(&hq->q)) {
1049
			tree_xpop(&holdqs[evp->type], evp->holdq);
1050
			free(hq);
1051
		}
1052
		evp->holdq = 0;
1053
		evp->state = RQ_EVPSTATE_PENDING;
1054
		stat_decrement("scheduler.ramqueue.hold", 1);
1055
	}
1056
	else if (evp->state != RQ_EVPSTATE_INFLIGHT) {
1057
		evl = rq_envelope_list(rq, evp);
1058
		TAILQ_REMOVE(evl, evp, entry);
1059
		if (evl == &rq->q_pending)
1060
			SPLAY_REMOVE(prioqtree, &rq->q_priotree, evp);
1061
	}
1062
1063
	evp->flags |= RQ_ENVELOPE_SUSPEND;
1064
1065
	return (1);
1066
}
1067
1068
static int
1069
rq_envelope_resume(struct rq_queue *rq, struct rq_envelope *evp)
1070
{
1071
	struct evplist	*evl;
1072
1073
	if (!(evp->flags & RQ_ENVELOPE_SUSPEND))
1074
		return (0);
1075
1076
	if (evp->state != RQ_EVPSTATE_INFLIGHT) {
1077
		evl = rq_envelope_list(rq, evp);
1078
		if (evl == &rq->q_pending)
1079
			sorted_insert(rq, evp);
1080
		else
1081
			TAILQ_INSERT_TAIL(evl, evp, entry);
1082
	}
1083
1084
	evp->flags &= ~RQ_ENVELOPE_SUSPEND;
1085
1086
	return (1);
1087
}
1088
1089
static void
1090
rq_envelope_delete(struct rq_queue *rq, struct rq_envelope *evp)
1091
{
1092
	tree_xpop(&evp->message->envelopes, evp->evpid);
1093
	if (tree_empty(&evp->message->envelopes)) {
1094
		tree_xpop(&rq->messages, evp->message->msgid);
1095
		free(evp->message);
1096
		stat_decrement("scheduler.ramqueue.message", 1);
1097
	}
1098
1099
	free(evp);
1100
	rq->evpcount--;
1101
	stat_decrement("scheduler.ramqueue.envelope", 1);
1102
}
1103
1104
static const char *
1105
rq_envelope_to_text(struct rq_envelope *e)
1106
{
1107
	static char	buf[256];
1108
	char		t[64];
1109
1110
	(void)snprintf(buf, sizeof buf, "evp:%016" PRIx64 " [", e->evpid);
1111
1112
	if (e->type == D_BOUNCE)
1113
		(void)strlcat(buf, "bounce", sizeof buf);
1114
	else if (e->type == D_MDA)
1115
		(void)strlcat(buf, "mda", sizeof buf);
1116
	else if (e->type == D_MTA)
1117
		(void)strlcat(buf, "mta", sizeof buf);
1118
1119
	(void)snprintf(t, sizeof t, ",expire=%s",
1120
	    duration_to_text(e->expire - currtime));
1121
	(void)strlcat(buf, t, sizeof buf);
1122
1123
1124
	switch (e->state) {
1125
	case RQ_EVPSTATE_PENDING:
1126
		(void)snprintf(t, sizeof t, ",pending=%s",
1127
		    duration_to_text(e->sched - currtime));
1128
		(void)strlcat(buf, t, sizeof buf);
1129
		break;
1130
1131
	case RQ_EVPSTATE_SCHEDULED:
1132
		(void)snprintf(t, sizeof t, ",scheduled=%s",
1133
		    duration_to_text(currtime - e->t_scheduled));
1134
		(void)strlcat(buf, t, sizeof buf);
1135
		break;
1136
1137
	case RQ_EVPSTATE_INFLIGHT:
1138
		(void)snprintf(t, sizeof t, ",inflight=%s",
1139
		    duration_to_text(currtime - e->t_inflight));
1140
		(void)strlcat(buf, t, sizeof buf);
1141
		break;
1142
1143
	case RQ_EVPSTATE_HELD:
1144
		(void)snprintf(t, sizeof t, ",held=%s",
1145
		    duration_to_text(currtime - e->t_inflight));
1146
		(void)strlcat(buf, t, sizeof buf);
1147
		break;
1148
	default:
1149
		errx(1, "%016" PRIx64 " bad state %d", e->evpid, e->state);
1150
	}
1151
1152
	if (e->flags & RQ_ENVELOPE_REMOVED)
1153
		(void)strlcat(buf, ",removed", sizeof buf);
1154
	if (e->flags & RQ_ENVELOPE_EXPIRED)
1155
		(void)strlcat(buf, ",expired", sizeof buf);
1156
	if (e->flags & RQ_ENVELOPE_SUSPEND)
1157
		(void)strlcat(buf, ",suspended", sizeof buf);
1158
1159
	(void)strlcat(buf, "]", sizeof buf);
1160
1161
	return (buf);
1162
}
1163
1164
static void
1165
rq_queue_dump(struct rq_queue *rq, const char * name)
1166
{
1167
	struct rq_message	*message;
1168
	struct rq_envelope	*envelope;
1169
	void			*i, *j;
1170
	uint64_t		 id;
1171
1172
	log_debug("debug: /--- ramqueue: %s", name);
1173
1174
	i = NULL;
1175
	while ((tree_iter(&rq->messages, &i, &id, (void*)&message))) {
1176
		log_debug("debug: | msg:%08" PRIx32, message->msgid);
1177
		j = NULL;
1178
		while ((tree_iter(&message->envelopes, &j, &id,
1179
		    (void*)&envelope)))
1180
			log_debug("debug: |   %s",
1181
			    rq_envelope_to_text(envelope));
1182
	}
1183
	log_debug("debug: \\---");
1184
}
1185
1186
static int
1187
rq_envelope_cmp(struct rq_envelope *e1, struct rq_envelope *e2)
1188
{
1189
	time_t	ref1, ref2;
1190
1191
	ref1 = (e1->sched < e1->expire) ? e1->sched : e1->expire;
1192
	ref2 = (e2->sched < e2->expire) ? e2->sched : e2->expire;
1193
	if (ref1 != ref2)
1194
		return (ref1 < ref2) ? -1 : 1;
1195
1196
	if (e1->evpid != e2->evpid)
1197
		return (e1->evpid < e2->evpid) ? -1 : 1;
1198
1199
	return 0;
1200
}
1201
1202
SPLAY_GENERATE(prioqtree, rq_envelope, t_entry, rq_envelope_cmp);