GCC Code Coverage Report
Directory: ./ Exec Total Coverage
File: usr.sbin/smtpd/smtpd/../scheduler.c Lines: 0 369 0.0 %
Date: 2017-11-13 Branches: 0 92 0.0 %

Line Branch Exec Source
1
/*	$OpenBSD: scheduler.c,v 1.56 2017/01/09 14:49:22 reyk Exp $	*/
2
3
/*
4
 * Copyright (c) 2008 Gilles Chehade <gilles@poolp.org>
5
 * Copyright (c) 2008 Pierre-Yves Ritschard <pyr@openbsd.org>
6
 * Copyright (c) 2008-2009 Jacek Masiulaniec <jacekm@dobremiasto.net>
7
 * Copyright (c) 2012 Eric Faurot <eric@openbsd.org>
8
 *
9
 * Permission to use, copy, modify, and distribute this software for any
10
 * purpose with or without fee is hereby granted, provided that the above
11
 * copyright notice and this permission notice appear in all copies.
12
 *
13
 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
14
 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
15
 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
16
 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
17
 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
18
 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
19
 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
20
 */
21
22
#include <sys/types.h>
23
#include <sys/queue.h>
24
#include <sys/tree.h>
25
#include <sys/socket.h>
26
#include <sys/stat.h>
27
28
#include <ctype.h>
29
#include <dirent.h>
30
#include <err.h>
31
#include <errno.h>
32
#include <event.h>
33
#include <imsg.h>
34
#include <inttypes.h>
35
#include <libgen.h>
36
#include <pwd.h>
37
#include <signal.h>
38
#include <stdio.h>
39
#include <stdlib.h>
40
#include <string.h>
41
#include <time.h>
42
#include <unistd.h>
43
#include <limits.h>
44
45
#include "smtpd.h"
46
#include "log.h"
47
48
static void scheduler_imsg(struct mproc *, struct imsg *);
49
static void scheduler_shutdown(void);
50
static void scheduler_reset_events(void);
51
static void scheduler_timeout(int, short, void *);
52
53
static struct scheduler_backend *backend = NULL;
54
static struct event		 ev;
55
static size_t			 ninflight = 0;
56
static int			*types;
57
static uint64_t			*evpids;
58
static uint32_t			*msgids;
59
static struct evpstate		*state;
60
61
extern const char *backend_scheduler;
62
63
void
64
scheduler_imsg(struct mproc *p, struct imsg *imsg)
65
{
66
	struct bounce_req_msg	 req;
67
	struct envelope		 evp;
68
	struct scheduler_info	 si;
69
	struct msg		 m;
70
	uint64_t		 evpid, id, holdq;
71
	uint32_t		 msgid;
72
	uint32_t       		 inflight;
73
	size_t			 n, i;
74
	time_t			 timestamp;
75
	int			 v, r, type;
76
77
	if (imsg == NULL)
78
		scheduler_shutdown();
79
80
	switch (imsg->hdr.type) {
81
82
	case IMSG_QUEUE_ENVELOPE_SUBMIT:
83
		m_msg(&m, imsg);
84
		m_get_envelope(&m, &evp);
85
		m_end(&m);
86
		log_trace(TRACE_SCHEDULER,
87
		    "scheduler: inserting evp:%016" PRIx64, evp.id);
88
		scheduler_info(&si, &evp);
89
		stat_increment("scheduler.envelope.incoming", 1);
90
		backend->insert(&si);
91
		return;
92
93
	case IMSG_QUEUE_MESSAGE_COMMIT:
94
		m_msg(&m, imsg);
95
		m_get_msgid(&m, &msgid);
96
		m_end(&m);
97
		log_trace(TRACE_SCHEDULER,
98
		    "scheduler: committing msg:%08" PRIx32, msgid);
99
		n = backend->commit(msgid);
100
		stat_decrement("scheduler.envelope.incoming", n);
101
		stat_increment("scheduler.envelope", n);
102
		scheduler_reset_events();
103
		return;
104
105
	case IMSG_QUEUE_DISCOVER_EVPID:
106
		m_msg(&m, imsg);
107
		m_get_envelope(&m, &evp);
108
		m_end(&m);
109
		r = backend->query(evp.id);
110
		if (r) {
111
			log_debug("debug: scheduler: evp:%016" PRIx64
112
			    " already scheduled", evp.id);
113
			return;
114
		}
115
		log_trace(TRACE_SCHEDULER,
116
		    "scheduler: discovering evp:%016" PRIx64, evp.id);
117
		scheduler_info(&si, &evp);
118
		stat_increment("scheduler.envelope.incoming", 1);
119
		backend->insert(&si);
120
		return;
121
122
	case IMSG_QUEUE_DISCOVER_MSGID:
123
		m_msg(&m, imsg);
124
		m_get_msgid(&m, &msgid);
125
		m_end(&m);
126
		r = backend->query(msgid);
127
		if (r) {
128
			log_debug("debug: scheduler: msgid:%08" PRIx32
129
			    " already scheduled", msgid);
130
			return;
131
		}
132
		log_trace(TRACE_SCHEDULER,
133
		    "scheduler: committing msg:%08" PRIx32, msgid);
134
		n = backend->commit(msgid);
135
		stat_decrement("scheduler.envelope.incoming", n);
136
		stat_increment("scheduler.envelope", n);
137
		scheduler_reset_events();
138
		return;
139
140
	case IMSG_QUEUE_MESSAGE_ROLLBACK:
141
		m_msg(&m, imsg);
142
		m_get_msgid(&m, &msgid);
143
		m_end(&m);
144
		log_trace(TRACE_SCHEDULER, "scheduler: aborting msg:%08" PRIx32,
145
		    msgid);
146
		n = backend->rollback(msgid);
147
		stat_decrement("scheduler.envelope.incoming", n);
148
		scheduler_reset_events();
149
		return;
150
151
	case IMSG_QUEUE_ENVELOPE_REMOVE:
152
		m_msg(&m, imsg);
153
		m_get_evpid(&m, &evpid);
154
		m_get_u32(&m, &inflight);
155
		m_end(&m);
156
		log_trace(TRACE_SCHEDULER,
157
		    "scheduler: queue requested removal of evp:%016" PRIx64,
158
		    evpid);
159
		stat_decrement("scheduler.envelope", 1);
160
		if (!inflight)
161
			backend->remove(evpid);
162
		else {
163
			backend->delete(evpid);
164
			ninflight -= 1;
165
			stat_decrement("scheduler.envelope.inflight", 1);
166
		}
167
168
		scheduler_reset_events();
169
		return;
170
171
	case IMSG_QUEUE_ENVELOPE_ACK:
172
		m_msg(&m, imsg);
173
		m_get_evpid(&m, &evpid);
174
		m_end(&m);
175
		log_trace(TRACE_SCHEDULER,
176
		    "scheduler: queue ack removal of evp:%016" PRIx64,
177
		    evpid);
178
		ninflight -= 1;
179
		stat_decrement("scheduler.envelope.inflight", 1);
180
		scheduler_reset_events();
181
		return;
182
183
	case IMSG_QUEUE_DELIVERY_OK:
184
		m_msg(&m, imsg);
185
		m_get_evpid(&m, &evpid);
186
		m_end(&m);
187
		log_trace(TRACE_SCHEDULER,
188
		    "scheduler: deleting evp:%016" PRIx64 " (ok)", evpid);
189
		backend->delete(evpid);
190
		ninflight -= 1;
191
		stat_increment("scheduler.delivery.ok", 1);
192
		stat_decrement("scheduler.envelope.inflight", 1);
193
		stat_decrement("scheduler.envelope", 1);
194
		scheduler_reset_events();
195
		return;
196
197
	case IMSG_QUEUE_DELIVERY_TEMPFAIL:
198
		m_msg(&m, imsg);
199
		m_get_envelope(&m, &evp);
200
		m_end(&m);
201
		log_trace(TRACE_SCHEDULER,
202
		    "scheduler: updating evp:%016" PRIx64, evp.id);
203
		scheduler_info(&si, &evp);
204
		backend->update(&si);
205
		ninflight -= 1;
206
		stat_increment("scheduler.delivery.tempfail", 1);
207
		stat_decrement("scheduler.envelope.inflight", 1);
208
209
		for (i = 0; i < MAX_BOUNCE_WARN; i++) {
210
			if (env->sc_bounce_warn[i] == 0)
211
				break;
212
			timestamp = si.creation + env->sc_bounce_warn[i];
213
			if (si.nexttry >= timestamp &&
214
			    si.lastbounce < timestamp) {
215
	    			req.evpid = evp.id;
216
				req.timestamp = timestamp;
217
				req.bounce.type = B_WARNING;
218
				req.bounce.delay = env->sc_bounce_warn[i];
219
				req.bounce.expire = si.expire;
220
				m_compose(p, IMSG_SCHED_ENVELOPE_BOUNCE, 0, 0, -1,
221
				    &req, sizeof req);
222
				break;
223
			}
224
		}
225
		scheduler_reset_events();
226
		return;
227
228
	case IMSG_QUEUE_DELIVERY_PERMFAIL:
229
		m_msg(&m, imsg);
230
		m_get_evpid(&m, &evpid);
231
		m_end(&m);
232
		log_trace(TRACE_SCHEDULER,
233
		    "scheduler: deleting evp:%016" PRIx64 " (fail)", evpid);
234
		backend->delete(evpid);
235
		ninflight -= 1;
236
		stat_increment("scheduler.delivery.permfail", 1);
237
		stat_decrement("scheduler.envelope.inflight", 1);
238
		stat_decrement("scheduler.envelope", 1);
239
		scheduler_reset_events();
240
		return;
241
242
	case IMSG_QUEUE_DELIVERY_LOOP:
243
		m_msg(&m, imsg);
244
		m_get_evpid(&m, &evpid);
245
		m_end(&m);
246
		log_trace(TRACE_SCHEDULER,
247
		    "scheduler: deleting evp:%016" PRIx64 " (loop)", evpid);
248
		backend->delete(evpid);
249
		ninflight -= 1;
250
		stat_increment("scheduler.delivery.loop", 1);
251
		stat_decrement("scheduler.envelope.inflight", 1);
252
		stat_decrement("scheduler.envelope", 1);
253
		scheduler_reset_events();
254
		return;
255
256
	case IMSG_QUEUE_HOLDQ_HOLD:
257
		m_msg(&m, imsg);
258
		m_get_evpid(&m, &evpid);
259
		m_get_id(&m, &holdq);
260
		m_end(&m);
261
		log_trace(TRACE_SCHEDULER,
262
		    "scheduler: holding evp:%016" PRIx64 " on %016" PRIx64,
263
		    evpid, holdq);
264
		backend->hold(evpid, holdq);
265
		ninflight -= 1;
266
		stat_decrement("scheduler.envelope.inflight", 1);
267
		scheduler_reset_events();
268
		return;
269
270
	case IMSG_QUEUE_HOLDQ_RELEASE:
271
		m_msg(&m, imsg);
272
		m_get_int(&m, &type);
273
		m_get_id(&m, &holdq);
274
		m_get_int(&m, &r);
275
		m_end(&m);
276
		log_trace(TRACE_SCHEDULER,
277
		    "scheduler: releasing %d on holdq (%d, %016" PRIx64 ")",
278
		    r, type, holdq);
279
		backend->release(type, holdq, r);
280
		scheduler_reset_events();
281
		return;
282
283
	case IMSG_CTL_PAUSE_MDA:
284
		log_trace(TRACE_SCHEDULER, "scheduler: pausing mda");
285
		env->sc_flags |= SMTPD_MDA_PAUSED;
286
		return;
287
288
	case IMSG_CTL_RESUME_MDA:
289
		log_trace(TRACE_SCHEDULER, "scheduler: resuming mda");
290
		env->sc_flags &= ~SMTPD_MDA_PAUSED;
291
		scheduler_reset_events();
292
		return;
293
294
	case IMSG_CTL_PAUSE_MTA:
295
		log_trace(TRACE_SCHEDULER, "scheduler: pausing mta");
296
		env->sc_flags |= SMTPD_MTA_PAUSED;
297
		return;
298
299
	case IMSG_CTL_RESUME_MTA:
300
		log_trace(TRACE_SCHEDULER, "scheduler: resuming mta");
301
		env->sc_flags &= ~SMTPD_MTA_PAUSED;
302
		scheduler_reset_events();
303
		return;
304
305
	case IMSG_CTL_VERBOSE:
306
		m_msg(&m, imsg);
307
		m_get_int(&m, &v);
308
		m_end(&m);
309
		log_setverbose(v);
310
		return;
311
312
	case IMSG_CTL_PROFILE:
313
		m_msg(&m, imsg);
314
		m_get_int(&m, &v);
315
		m_end(&m);
316
		profiling = v;
317
		return;
318
319
	case IMSG_CTL_LIST_MESSAGES:
320
		msgid = *(uint32_t *)(imsg->data);
321
		n = backend->messages(msgid, msgids, env->sc_scheduler_max_msg_batch_size);
322
		m_compose(p, IMSG_CTL_LIST_MESSAGES, imsg->hdr.peerid, 0, -1,
323
		    msgids, n * sizeof (*msgids));
324
		return;
325
326
	case IMSG_CTL_LIST_ENVELOPES:
327
		id = *(uint64_t *)(imsg->data);
328
		n = backend->envelopes(id, state, env->sc_scheduler_max_evp_batch_size);
329
		for (i = 0; i < n; i++) {
330
			m_create(p_queue, IMSG_CTL_LIST_ENVELOPES,
331
			    imsg->hdr.peerid, 0, -1);
332
			m_add_evpid(p_queue, state[i].evpid);
333
			m_add_int(p_queue, state[i].flags);
334
			m_add_time(p_queue, state[i].time);
335
			m_close(p_queue);
336
		}
337
		m_compose(p_queue, IMSG_CTL_LIST_ENVELOPES,
338
		    imsg->hdr.peerid, 0, -1, NULL, 0);
339
		return;
340
341
	case IMSG_CTL_SCHEDULE:
342
		id = *(uint64_t *)(imsg->data);
343
		if (id <= 0xffffffffL)
344
			log_debug("debug: scheduler: "
345
			    "scheduling msg:%08" PRIx64, id);
346
		else
347
			log_debug("debug: scheduler: "
348
			    "scheduling evp:%016" PRIx64, id);
349
		r = backend->schedule(id);
350
		scheduler_reset_events();
351
		m_compose(p, r ? IMSG_CTL_OK : IMSG_CTL_FAIL, imsg->hdr.peerid,
352
		    0, -1, NULL, 0);
353
		return;
354
355
	case IMSG_QUEUE_ENVELOPE_SCHEDULE:
356
		id = *(uint64_t *)(imsg->data);
357
		backend->schedule(id);
358
		scheduler_reset_events();
359
		return;
360
361
	case IMSG_CTL_REMOVE:
362
		id = *(uint64_t *)(imsg->data);
363
		if (id <= 0xffffffffL)
364
			log_debug("debug: scheduler: "
365
			    "removing msg:%08" PRIx64, id);
366
		else
367
			log_debug("debug: scheduler: "
368
			    "removing evp:%016" PRIx64, id);
369
		r = backend->remove(id);
370
		scheduler_reset_events();
371
		m_compose(p, r ? IMSG_CTL_OK : IMSG_CTL_FAIL, imsg->hdr.peerid,
372
		    0, -1, NULL, 0);
373
		return;
374
375
	case IMSG_CTL_PAUSE_EVP:
376
		id = *(uint64_t *)(imsg->data);
377
		if (id <= 0xffffffffL)
378
			log_debug("debug: scheduler: "
379
			    "suspending msg:%08" PRIx64, id);
380
		else
381
			log_debug("debug: scheduler: "
382
			    "suspending evp:%016" PRIx64, id);
383
		r = backend->suspend(id);
384
		scheduler_reset_events();
385
		m_compose(p, r ? IMSG_CTL_OK : IMSG_CTL_FAIL, imsg->hdr.peerid,
386
		    0, -1, NULL, 0);
387
		return;
388
389
	case IMSG_CTL_RESUME_EVP:
390
		id = *(uint64_t *)(imsg->data);
391
		if (id <= 0xffffffffL)
392
			log_debug("debug: scheduler: "
393
			    "resuming msg:%08" PRIx64, id);
394
		else
395
			log_debug("debug: scheduler: "
396
			    "resuming evp:%016" PRIx64, id);
397
		r = backend->resume(id);
398
		scheduler_reset_events();
399
		m_compose(p, r ? IMSG_CTL_OK : IMSG_CTL_FAIL, imsg->hdr.peerid,
400
		    0, -1, NULL, 0);
401
		return;
402
	}
403
404
	errx(1, "scheduler_imsg: unexpected %s imsg",
405
	    imsg_to_str(imsg->hdr.type));
406
}
407
408
static void
409
scheduler_shutdown(void)
410
{
411
	log_debug("debug: scheduler agent exiting");
412
	_exit(0);
413
}
414
415
static void
416
scheduler_reset_events(void)
417
{
418
	struct timeval	 tv;
419
420
	evtimer_del(&ev);
421
	tv.tv_sec = 0;
422
	tv.tv_usec = 0;
423
	evtimer_add(&ev, &tv);
424
}
425
426
int
427
scheduler(void)
428
{
429
	struct passwd	*pw;
430
431
	backend = scheduler_backend_lookup(backend_scheduler);
432
	if (backend == NULL)
433
		errx(1, "cannot find scheduler backend \"%s\"",
434
		    backend_scheduler);
435
436
	purge_config(PURGE_EVERYTHING);
437
438
	if ((pw = getpwnam(SMTPD_USER)) == NULL)
439
		fatalx("unknown user " SMTPD_USER);
440
441
	config_process(PROC_SCHEDULER);
442
443
	backend->init(backend_scheduler);
444
445
	if (chroot(PATH_CHROOT) == -1)
446
		fatal("scheduler: chroot");
447
	if (chdir("/") == -1)
448
		fatal("scheduler: chdir(\"/\")");
449
450
	if (setgroups(1, &pw->pw_gid) ||
451
	    setresgid(pw->pw_gid, pw->pw_gid, pw->pw_gid) ||
452
	    setresuid(pw->pw_uid, pw->pw_uid, pw->pw_uid))
453
		fatal("scheduler: cannot drop privileges");
454
455
	evpids = xcalloc(env->sc_scheduler_max_schedule, sizeof *evpids, "scheduler: init evpids");
456
	types = xcalloc(env->sc_scheduler_max_schedule, sizeof *types, "scheduler: init types");
457
	msgids = xcalloc(env->sc_scheduler_max_msg_batch_size, sizeof *msgids, "scheduler: list msg");
458
	state = xcalloc(env->sc_scheduler_max_evp_batch_size, sizeof *state, "scheduler: list evp");
459
460
	imsg_callback = scheduler_imsg;
461
	event_init();
462
463
	signal(SIGINT, SIG_IGN);
464
	signal(SIGTERM, SIG_IGN);
465
	signal(SIGPIPE, SIG_IGN);
466
	signal(SIGHUP, SIG_IGN);
467
468
	config_peer(PROC_CONTROL);
469
	config_peer(PROC_QUEUE);
470
471
	evtimer_set(&ev, scheduler_timeout, NULL);
472
	scheduler_reset_events();
473
474
	if (pledge("stdio flock rpath cpath wpath", NULL) == -1)
475
		err(1, "pledge");
476
477
	event_dispatch();
478
	fatalx("exited event loop");
479
480
	return (0);
481
}
482
483
static void
484
scheduler_timeout(int fd, short event, void *p)
485
{
486
	struct timeval		tv;
487
	size_t			i;
488
	size_t			d_inflight;
489
	size_t			d_envelope;
490
	size_t			d_removed;
491
	size_t			d_expired;
492
	size_t			d_updated;
493
	size_t			count;
494
	int			mask, r, delay;
495
496
	tv.tv_sec = 0;
497
	tv.tv_usec = 0;
498
499
	mask = SCHED_UPDATE;
500
501
	if (ninflight <  env->sc_scheduler_max_inflight) {
502
		mask |= SCHED_EXPIRE | SCHED_REMOVE | SCHED_BOUNCE;
503
		if (!(env->sc_flags & SMTPD_MDA_PAUSED))
504
			mask |= SCHED_MDA;
505
		if (!(env->sc_flags & SMTPD_MTA_PAUSED))
506
			mask |= SCHED_MTA;
507
	}
508
509
	count = env->sc_scheduler_max_schedule;
510
511
	log_trace(TRACE_SCHEDULER, "scheduler: getting batch: mask=0x%x, count=%zu", mask, count);
512
513
	r = backend->batch(mask, &delay, &count, evpids, types);
514
515
	log_trace(TRACE_SCHEDULER, "scheduler: got r=%i, delay=%i, count=%zu", r, delay, count);
516
517
	if (r < 0)
518
		fatalx("scheduler: error in batch handler");
519
520
	if (r == 0) {
521
522
		if (delay < -1)
523
			fatalx("scheduler: invalid delay %d", delay);
524
525
		if (delay == -1) {
526
			log_trace(TRACE_SCHEDULER, "scheduler: sleeping");
527
			return;
528
		}
529
530
		tv.tv_sec = delay;
531
		tv.tv_usec = 0;
532
		log_trace(TRACE_SCHEDULER,
533
		    "scheduler: waiting for %s", duration_to_text(tv.tv_sec));
534
		evtimer_add(&ev, &tv);
535
		return;
536
	}
537
538
	d_inflight = 0;
539
	d_envelope = 0;
540
	d_removed = 0;
541
	d_expired = 0;
542
	d_updated = 0;
543
544
	for (i = 0; i < count; i++) {
545
		switch(types[i]) {
546
		case SCHED_REMOVE:
547
			log_debug("debug: scheduler: evp:%016" PRIx64
548
			    " removed", evpids[i]);
549
			m_create(p_queue, IMSG_SCHED_ENVELOPE_REMOVE, 0, 0, -1);
550
			m_add_evpid(p_queue, evpids[i]);
551
			m_close(p_queue);
552
			d_envelope += 1;
553
			d_removed += 1;
554
			d_inflight += 1;
555
			break;
556
557
		case SCHED_EXPIRE:
558
			log_debug("debug: scheduler: evp:%016" PRIx64
559
			    " expired", evpids[i]);
560
			m_create(p_queue, IMSG_SCHED_ENVELOPE_EXPIRE, 0, 0, -1);
561
			m_add_evpid(p_queue, evpids[i]);
562
			m_close(p_queue);
563
			d_envelope += 1;
564
			d_expired += 1;
565
			d_inflight += 1;
566
			break;
567
568
		case SCHED_UPDATE:
569
			log_debug("debug: scheduler: evp:%016" PRIx64
570
			    " scheduled (update)", evpids[i]);
571
			d_updated += 1;
572
			break;
573
574
		case SCHED_BOUNCE:
575
			log_debug("debug: scheduler: evp:%016" PRIx64
576
			    " scheduled (bounce)", evpids[i]);
577
			m_create(p_queue, IMSG_SCHED_ENVELOPE_INJECT, 0, 0, -1);
578
			m_add_evpid(p_queue, evpids[i]);
579
			m_close(p_queue);
580
			d_inflight += 1;
581
			break;
582
583
		case SCHED_MDA:
584
			log_debug("debug: scheduler: evp:%016" PRIx64
585
			    " scheduled (mda)", evpids[i]);
586
			m_create(p_queue, IMSG_SCHED_ENVELOPE_DELIVER, 0, 0, -1);
587
			m_add_evpid(p_queue, evpids[i]);
588
			m_close(p_queue);
589
			d_inflight += 1;
590
			break;
591
592
		case SCHED_MTA:
593
			log_debug("debug: scheduler: evp:%016" PRIx64
594
			    " scheduled (mta)", evpids[i]);
595
			m_create(p_queue, IMSG_SCHED_ENVELOPE_TRANSFER, 0, 0, -1);
596
			m_add_evpid(p_queue, evpids[i]);
597
			m_close(p_queue);
598
			d_inflight += 1;
599
			break;
600
		}
601
	}
602
603
	stat_decrement("scheduler.envelope", d_envelope);
604
	stat_increment("scheduler.envelope.inflight", d_inflight);
605
	stat_increment("scheduler.envelope.expired", d_expired);
606
	stat_increment("scheduler.envelope.removed", d_removed);
607
	stat_increment("scheduler.envelope.updated", d_updated);
608
609
	ninflight += d_inflight;
610
611
	tv.tv_sec = 0;
612
	tv.tv_usec = 0;
613
	evtimer_add(&ev, &tv);
614
}