GCC Code Coverage Report
Directory: ./ Exec Total Coverage
File: usr.sbin/smtpd/smtpd/../queue_backend.c Lines: 0 322 0.0 %
Date: 2017-11-07 Branches: 0 180 0.0 %

Line Branch Exec Source
1
/*	$OpenBSD: queue_backend.c,v 1.62 2016/02/04 12:46:28 eric Exp $	*/
2
3
/*
4
 * Copyright (c) 2011 Gilles Chehade <gilles@poolp.org>
5
 *
6
 * Permission to use, copy, modify, and distribute this software for any
7
 * purpose with or without fee is hereby granted, provided that the above
8
 * copyright notice and this permission notice appear in all copies.
9
 *
10
 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11
 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12
 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13
 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14
 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15
 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16
 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17
 */
18
19
#include <sys/types.h>
20
#include <sys/queue.h>
21
#include <sys/tree.h>
22
#include <sys/socket.h>
23
#include <sys/stat.h>
24
25
#include <ctype.h>
26
#include <err.h>
27
#include <errno.h>
28
#include <event.h>
29
#include <fcntl.h>
30
#include <grp.h>
31
#include <imsg.h>
32
#include <limits.h>
33
#include <inttypes.h>
34
#include <libgen.h>
35
#include <pwd.h>
36
#include <stdio.h>
37
#include <stdlib.h>
38
#include <string.h>
39
#include <time.h>
40
#include <unistd.h>
41
42
#include "smtpd.h"
43
#include "log.h"
44
45
static const char* envelope_validate(struct envelope *);
46
47
extern struct queue_backend	queue_backend_fs;
48
extern struct queue_backend	queue_backend_null;
49
extern struct queue_backend	queue_backend_proc;
50
extern struct queue_backend	queue_backend_ram;
51
52
static void queue_envelope_cache_add(struct envelope *);
53
static void queue_envelope_cache_update(struct envelope *);
54
static void queue_envelope_cache_del(uint64_t evpid);
55
56
TAILQ_HEAD(evplst, envelope);
57
58
static struct tree		evpcache_tree;
59
static struct evplst		evpcache_list;
60
static struct queue_backend	*backend;
61
62
static int (*handler_close)(void);
63
static int (*handler_message_create)(uint32_t *);
64
static int (*handler_message_commit)(uint32_t, const char*);
65
static int (*handler_message_delete)(uint32_t);
66
static int (*handler_message_fd_r)(uint32_t);
67
static int (*handler_message_corrupt)(uint32_t);
68
static int (*handler_message_uncorrupt)(uint32_t);
69
static int (*handler_envelope_create)(uint32_t, const char *, size_t, uint64_t *);
70
static int (*handler_envelope_delete)(uint64_t);
71
static int (*handler_envelope_update)(uint64_t, const char *, size_t);
72
static int (*handler_envelope_load)(uint64_t, char *, size_t);
73
static int (*handler_envelope_walk)(uint64_t *, char *, size_t);
74
static int (*handler_message_walk)(uint64_t *, char *, size_t,
75
    uint32_t, int *, void **);
76
77
#ifdef QUEUE_PROFILING
78
79
static struct {
80
	struct timespec	 t0;
81
	const char	*name;
82
} profile;
83
84
static inline void profile_enter(const char *name)
85
{
86
	if ((profiling & PROFILE_QUEUE) == 0)
87
		return;
88
89
	profile.name = name;
90
	clock_gettime(CLOCK_MONOTONIC, &profile.t0);
91
}
92
93
static inline void profile_leave(void)
94
{
95
	struct timespec	 t1, dt;
96
97
	if ((profiling & PROFILE_QUEUE) == 0)
98
		return;
99
100
	clock_gettime(CLOCK_MONOTONIC, &t1);
101
	timespecsub(&t1, &profile.t0, &dt);
102
	log_debug("profile-queue: %s %lld.%09ld", profile.name,
103
	    (long long)dt.tv_sec, dt.tv_nsec);
104
}
105
#else
106
#define profile_enter(x)	do {} while (0)
107
#define profile_leave()		do {} while (0)
108
#endif
109
110
static int
111
queue_message_path(uint32_t msgid, char *buf, size_t len)
112
{
113
	return bsnprintf(buf, len, "%s/%08"PRIx32, PATH_TEMPORARY, msgid);
114
}
115
116
int
117
queue_init(const char *name, int server)
118
{
119
	struct passwd	*pwq;
120
	struct group	*gr;
121
	int		 r;
122
123
	pwq = getpwnam(SMTPD_QUEUE_USER);
124
	if (pwq == NULL)
125
		errx(1, "unknown user %s", SMTPD_QUEUE_USER);
126
127
	gr = getgrnam(SMTPD_QUEUE_GROUP);
128
	if (gr == NULL)
129
		errx(1, "unknown group %s", SMTPD_QUEUE_GROUP);
130
131
	tree_init(&evpcache_tree);
132
	TAILQ_INIT(&evpcache_list);
133
134
	if (!strcmp(name, "fs"))
135
		backend = &queue_backend_fs;
136
	else if (!strcmp(name, "null"))
137
		backend = &queue_backend_null;
138
	else if (!strcmp(name, "ram"))
139
		backend = &queue_backend_ram;
140
	else
141
		backend = &queue_backend_proc;
142
143
	if (server) {
144
		if (ckdir(PATH_SPOOL, 0711, 0, 0, 1) == 0)
145
			errx(1, "error in spool directory setup");
146
		if (ckdir(PATH_SPOOL PATH_OFFLINE, 0770, 0, gr->gr_gid, 1) == 0)
147
			errx(1, "error in offline directory setup");
148
		if (ckdir(PATH_SPOOL PATH_PURGE, 0700, pwq->pw_uid, 0, 1) == 0)
149
			errx(1, "error in purge directory setup");
150
151
		mvpurge(PATH_SPOOL PATH_TEMPORARY, PATH_SPOOL PATH_PURGE);
152
153
		if (ckdir(PATH_SPOOL PATH_TEMPORARY, 0700, pwq->pw_uid, 0, 1) == 0)
154
			errx(1, "error in purge directory setup");
155
	}
156
157
	r = backend->init(pwq, server, name);
158
159
	log_trace(TRACE_QUEUE, "queue-backend: queue_init(%d) -> %d", server, r);
160
161
	return (r);
162
}
163
164
int
165
queue_close(void)
166
{
167
	if (handler_close)
168
		return (handler_close());
169
170
	return (1);
171
}
172
173
int
174
queue_message_create(uint32_t *msgid)
175
{
176
	int	r;
177
178
	profile_enter("queue_message_create");
179
	r = handler_message_create(msgid);
180
	profile_leave();
181
182
	log_trace(TRACE_QUEUE,
183
	    "queue-backend: queue_message_create() -> %d (%08"PRIx32")",
184
	    r, *msgid);
185
186
	return (r);
187
}
188
189
int
190
queue_message_delete(uint32_t msgid)
191
{
192
	char	msgpath[PATH_MAX];
193
	uint64_t evpid;
194
	void   *iter;
195
	int	r;
196
197
	profile_enter("queue_message_delete");
198
	r = handler_message_delete(msgid);
199
	profile_leave();
200
201
	/* in case the message is incoming */
202
	queue_message_path(msgid, msgpath, sizeof(msgpath));
203
	unlink(msgpath);
204
205
	/* remove remaining envelopes from the cache if any (on rollback) */
206
	evpid = msgid_to_evpid(msgid);
207
	for (;;) {
208
		iter = NULL;
209
		if (!tree_iterfrom(&evpcache_tree, &iter, evpid, &evpid, NULL))
210
			break;
211
		if (evpid_to_msgid(evpid) != msgid)
212
			break;
213
		queue_envelope_cache_del(evpid);
214
	}
215
216
	log_trace(TRACE_QUEUE,
217
	    "queue-backend: queue_message_delete(%08"PRIx32") -> %d", msgid, r);
218
219
	return (r);
220
}
221
222
int
223
queue_message_commit(uint32_t msgid)
224
{
225
	int	r;
226
	char	msgpath[PATH_MAX];
227
	char	tmppath[PATH_MAX];
228
	FILE	*ifp = NULL;
229
	FILE	*ofp = NULL;
230
231
	profile_enter("queue_message_commit");
232
233
	queue_message_path(msgid, msgpath, sizeof(msgpath));
234
235
	if (env->sc_queue_flags & QUEUE_COMPRESSION) {
236
		bsnprintf(tmppath, sizeof tmppath, "%s.comp", msgpath);
237
		ifp = fopen(msgpath, "r");
238
		ofp = fopen(tmppath, "w+");
239
		if (ifp == NULL || ofp == NULL)
240
			goto err;
241
		if (!compress_file(ifp, ofp))
242
			goto err;
243
		fclose(ifp);
244
		fclose(ofp);
245
		ifp = NULL;
246
		ofp = NULL;
247
248
		if (rename(tmppath, msgpath) == -1) {
249
			if (errno == ENOSPC)
250
				return (0);
251
			unlink(tmppath);
252
			log_warn("rename");
253
			return (0);
254
		}
255
	}
256
257
	if (env->sc_queue_flags & QUEUE_ENCRYPTION) {
258
		bsnprintf(tmppath, sizeof tmppath, "%s.enc", msgpath);
259
		ifp = fopen(msgpath, "r");
260
		ofp = fopen(tmppath, "w+");
261
		if (ifp == NULL || ofp == NULL)
262
			goto err;
263
		if (!crypto_encrypt_file(ifp, ofp))
264
			goto err;
265
		fclose(ifp);
266
		fclose(ofp);
267
		ifp = NULL;
268
		ofp = NULL;
269
270
		if (rename(tmppath, msgpath) == -1) {
271
			if (errno == ENOSPC)
272
				return (0);
273
			unlink(tmppath);
274
			log_warn("rename");
275
			return (0);
276
		}
277
	}
278
279
	r = handler_message_commit(msgid, msgpath);
280
	profile_leave();
281
282
	/* in case it's not done by the backend */
283
	unlink(msgpath);
284
285
	log_trace(TRACE_QUEUE,
286
	    "queue-backend: queue_message_commit(%08"PRIx32") -> %d",
287
	    msgid, r);
288
289
	return (r);
290
291
err:
292
	if (ifp)
293
		fclose(ifp);
294
	if (ofp)
295
		fclose(ofp);
296
	return 0;
297
}
298
299
int
300
queue_message_corrupt(uint32_t msgid)
301
{
302
	int	r;
303
304
	profile_enter("queue_message_corrupt");
305
	r = handler_message_corrupt(msgid);
306
	profile_leave();
307
308
	log_trace(TRACE_QUEUE,
309
	    "queue-backend: queue_message_corrupt(%08"PRIx32") -> %d", msgid, r);
310
311
	return (r);
312
}
313
314
int
315
queue_message_uncorrupt(uint32_t msgid)
316
{
317
	return handler_message_uncorrupt(msgid);
318
}
319
320
int
321
queue_message_fd_r(uint32_t msgid)
322
{
323
	int	fdin = -1, fdout = -1, fd = -1;
324
	FILE	*ifp = NULL;
325
	FILE	*ofp = NULL;
326
327
	profile_enter("queue_message_fd_r");
328
	fdin = handler_message_fd_r(msgid);
329
	profile_leave();
330
331
	log_trace(TRACE_QUEUE,
332
	    "queue-backend: queue_message_fd_r(%08"PRIx32") -> %d", msgid, fdin);
333
334
	if (fdin == -1)
335
		return (-1);
336
337
	if (env->sc_queue_flags & QUEUE_ENCRYPTION) {
338
		if ((fdout = mktmpfile()) == -1)
339
			goto err;
340
		if ((fd = dup(fdout)) == -1)
341
			goto err;
342
		if ((ifp = fdopen(fdin, "r")) == NULL)
343
			goto err;
344
		fdin = fd;
345
		fd = -1;
346
		if ((ofp = fdopen(fdout, "w+")) == NULL)
347
			goto err;
348
349
		if (!crypto_decrypt_file(ifp, ofp))
350
			goto err;
351
352
		fclose(ifp);
353
		ifp = NULL;
354
		fclose(ofp);
355
		ofp = NULL;
356
		lseek(fdin, SEEK_SET, 0);
357
	}
358
359
	if (env->sc_queue_flags & QUEUE_COMPRESSION) {
360
		if ((fdout = mktmpfile()) == -1)
361
			goto err;
362
		if ((fd = dup(fdout)) == -1)
363
			goto err;
364
		if ((ifp = fdopen(fdin, "r")) == NULL)
365
			goto err;
366
		fdin = fd;
367
		fd = -1;
368
		if ((ofp = fdopen(fdout, "w+")) == NULL)
369
			goto err;
370
371
		if (!uncompress_file(ifp, ofp))
372
			goto err;
373
374
		fclose(ifp);
375
		ifp = NULL;
376
		fclose(ofp);
377
		ofp = NULL;
378
		lseek(fdin, SEEK_SET, 0);
379
	}
380
381
	return (fdin);
382
383
err:
384
	if (fd != -1)
385
		close(fd);
386
	if (fdin != -1)
387
		close(fdin);
388
	if (fdout != -1)
389
		close(fdout);
390
	if (ifp)
391
		fclose(ifp);
392
	if (ofp)
393
		fclose(ofp);
394
	return -1;
395
}
396
397
int
398
queue_message_fd_rw(uint32_t msgid)
399
{
400
	char buf[PATH_MAX];
401
402
	queue_message_path(msgid, buf, sizeof(buf));
403
404
	return open(buf, O_RDWR | O_CREAT | O_EXCL, 0600);
405
}
406
407
static int
408
queue_envelope_dump_buffer(struct envelope *ep, char *evpbuf, size_t evpbufsize)
409
{
410
	char   *evp;
411
	size_t	evplen;
412
	size_t	complen;
413
	char	compbuf[sizeof(struct envelope)];
414
	size_t	enclen;
415
	char	encbuf[sizeof(struct envelope)];
416
417
	evp = evpbuf;
418
	evplen = envelope_dump_buffer(ep, evpbuf, evpbufsize);
419
	if (evplen == 0)
420
		return (0);
421
422
	if (env->sc_queue_flags & QUEUE_COMPRESSION) {
423
		complen = compress_chunk(evp, evplen, compbuf, sizeof compbuf);
424
		if (complen == 0)
425
			return (0);
426
		evp = compbuf;
427
		evplen = complen;
428
	}
429
430
	if (env->sc_queue_flags & QUEUE_ENCRYPTION) {
431
		enclen = crypto_encrypt_buffer(evp, evplen, encbuf, sizeof encbuf);
432
		if (enclen == 0)
433
			return (0);
434
		evp = encbuf;
435
		evplen = enclen;
436
	}
437
438
	memmove(evpbuf, evp, evplen);
439
440
	return (evplen);
441
}
442
443
static int
444
queue_envelope_load_buffer(struct envelope *ep, char *evpbuf, size_t evpbufsize)
445
{
446
	char		*evp;
447
	size_t		 evplen;
448
	char		 compbuf[sizeof(struct envelope)];
449
	size_t		 complen;
450
	char		 encbuf[sizeof(struct envelope)];
451
	size_t		 enclen;
452
453
	evp = evpbuf;
454
	evplen = evpbufsize;
455
456
	if (env->sc_queue_flags & QUEUE_ENCRYPTION) {
457
		enclen = crypto_decrypt_buffer(evp, evplen, encbuf, sizeof encbuf);
458
		if (enclen == 0)
459
			return (0);
460
		evp = encbuf;
461
		evplen = enclen;
462
	}
463
464
	if (env->sc_queue_flags & QUEUE_COMPRESSION) {
465
		complen = uncompress_chunk(evp, evplen, compbuf, sizeof compbuf);
466
		if (complen == 0)
467
			return (0);
468
		evp = compbuf;
469
		evplen = complen;
470
	}
471
472
	return (envelope_load_buffer(ep, evp, evplen));
473
}
474
475
static void
476
queue_envelope_cache_add(struct envelope *e)
477
{
478
	struct envelope *cached;
479
480
	while (tree_count(&evpcache_tree) >= env->sc_queue_evpcache_size)
481
		queue_envelope_cache_del(TAILQ_LAST(&evpcache_list, evplst)->id);
482
483
	cached = xcalloc(1, sizeof *cached, "queue_envelope_cache_add");
484
	*cached = *e;
485
	TAILQ_INSERT_HEAD(&evpcache_list, cached, entry);
486
	tree_xset(&evpcache_tree, e->id, cached);
487
	stat_increment("queue.evpcache.size", 1);
488
}
489
490
static void
491
queue_envelope_cache_update(struct envelope *e)
492
{
493
	struct envelope *cached;
494
495
	if ((cached = tree_get(&evpcache_tree, e->id)) == NULL) {
496
		queue_envelope_cache_add(e);
497
		stat_increment("queue.evpcache.update.missed", 1);
498
	} else {
499
		TAILQ_REMOVE(&evpcache_list, cached, entry);
500
		*cached = *e;
501
		TAILQ_INSERT_HEAD(&evpcache_list, cached, entry);
502
		stat_increment("queue.evpcache.update.hit", 1);
503
	}
504
}
505
506
static void
507
queue_envelope_cache_del(uint64_t evpid)
508
{
509
	struct envelope *cached;
510
511
	if ((cached = tree_pop(&evpcache_tree, evpid)) == NULL)
512
		return;
513
514
	TAILQ_REMOVE(&evpcache_list, cached, entry);
515
	free(cached);
516
	stat_decrement("queue.evpcache.size", 1);
517
}
518
519
int
520
queue_envelope_create(struct envelope *ep)
521
{
522
	int		 r;
523
	char		 evpbuf[sizeof(struct envelope)];
524
	size_t		 evplen;
525
	uint64_t	 evpid;
526
	uint32_t	 msgid;
527
528
	ep->creation = time(NULL);
529
	evplen = queue_envelope_dump_buffer(ep, evpbuf, sizeof evpbuf);
530
	if (evplen == 0)
531
		return (0);
532
533
	evpid = ep->id;
534
	msgid = evpid_to_msgid(evpid);
535
536
	profile_enter("queue_envelope_create");
537
	r = handler_envelope_create(msgid, evpbuf, evplen, &ep->id);
538
	profile_leave();
539
540
	log_trace(TRACE_QUEUE,
541
	    "queue-backend: queue_envelope_create(%016"PRIx64", %zu) -> %d (%016"PRIx64")",
542
	    evpid, evplen, r, ep->id);
543
544
	if (!r) {
545
		ep->creation = 0;
546
		ep->id = 0;
547
	}
548
549
	if (r && env->sc_queue_flags & QUEUE_EVPCACHE)
550
		queue_envelope_cache_add(ep);
551
552
	return (r);
553
}
554
555
int
556
queue_envelope_delete(uint64_t evpid)
557
{
558
	int	r;
559
560
	if (env->sc_queue_flags & QUEUE_EVPCACHE)
561
		queue_envelope_cache_del(evpid);
562
563
	profile_enter("queue_envelope_delete");
564
	r = handler_envelope_delete(evpid);
565
	profile_leave();
566
567
	log_trace(TRACE_QUEUE,
568
	    "queue-backend: queue_envelope_delete(%016"PRIx64") -> %d",
569
	    evpid, r);
570
571
	return (r);
572
}
573
574
int
575
queue_envelope_load(uint64_t evpid, struct envelope *ep)
576
{
577
	const char	*e;
578
	char		 evpbuf[sizeof(struct envelope)];
579
	size_t		 evplen;
580
	struct envelope	*cached;
581
582
	if ((env->sc_queue_flags & QUEUE_EVPCACHE) &&
583
	    (cached = tree_get(&evpcache_tree, evpid))) {
584
		*ep = *cached;
585
		stat_increment("queue.evpcache.load.hit", 1);
586
		return (1);
587
	}
588
589
	ep->id = evpid;
590
	profile_enter("queue_envelope_load");
591
	evplen = handler_envelope_load(ep->id, evpbuf, sizeof evpbuf);
592
	profile_leave();
593
594
	log_trace(TRACE_QUEUE,
595
	    "queue-backend: queue_envelope_load(%016"PRIx64") -> %zu",
596
	    evpid, evplen);
597
598
	if (evplen == 0)
599
		return (0);
600
601
	if (queue_envelope_load_buffer(ep, evpbuf, evplen)) {
602
		if ((e = envelope_validate(ep)) == NULL) {
603
			ep->id = evpid;
604
			if (env->sc_queue_flags & QUEUE_EVPCACHE) {
605
				queue_envelope_cache_add(ep);
606
				stat_increment("queue.evpcache.load.missed", 1);
607
			}
608
			return (1);
609
		}
610
		log_debug("debug: invalid envelope %016" PRIx64 ": %s",
611
		    ep->id, e);
612
	}
613
614
	(void)queue_message_corrupt(evpid_to_msgid(evpid));
615
	return (0);
616
}
617
618
int
619
queue_envelope_update(struct envelope *ep)
620
{
621
	char	evpbuf[sizeof(struct envelope)];
622
	size_t	evplen;
623
	int	r;
624
625
	evplen = queue_envelope_dump_buffer(ep, evpbuf, sizeof evpbuf);
626
	if (evplen == 0)
627
		return (0);
628
629
	profile_enter("queue_envelope_update");
630
	r = handler_envelope_update(ep->id, evpbuf, evplen);
631
	profile_leave();
632
633
	if (r && env->sc_queue_flags & QUEUE_EVPCACHE)
634
		queue_envelope_cache_update(ep);
635
636
	log_trace(TRACE_QUEUE,
637
	    "queue-backend: queue_envelope_update(%016"PRIx64") -> %d",
638
	    ep->id, r);
639
640
	return (r);
641
}
642
643
int
644
queue_message_walk(struct envelope *ep, uint32_t msgid, int *done, void **data)
645
{
646
	char		 evpbuf[sizeof(struct envelope)];
647
	uint64_t	 evpid;
648
	int		 r;
649
	const char	*e;
650
651
	profile_enter("queue_message_walk");
652
	r = handler_message_walk(&evpid, evpbuf, sizeof evpbuf,
653
	    msgid, done, data);
654
	profile_leave();
655
656
	log_trace(TRACE_QUEUE,
657
	    "queue-backend: queue_message_walk() -> %d (%016"PRIx64")",
658
	    r, evpid);
659
660
	if (r == -1)
661
		return (r);
662
663
	if (r && queue_envelope_load_buffer(ep, evpbuf, (size_t)r)) {
664
		if ((e = envelope_validate(ep)) == NULL) {
665
			ep->id = evpid;
666
			/*
667
			 * do not cache the envelope here, while discovering
668
			 * envelopes one could re-run discover on already
669
			 * scheduled envelopes which leads to triggering of
670
			 * strict checks in caching. Envelopes could anyway
671
			 * be loaded from backend if it isn't cached.
672
			 */
673
			return (1);
674
		}
675
		log_debug("debug: invalid envelope %016" PRIx64 ": %s",
676
		    ep->id, e);
677
		(void)queue_message_corrupt(evpid_to_msgid(evpid));
678
	}
679
680
	return (0);
681
}
682
683
int
684
queue_envelope_walk(struct envelope *ep)
685
{
686
	const char	*e;
687
	uint64_t	 evpid;
688
	char		 evpbuf[sizeof(struct envelope)];
689
	int		 r;
690
691
	profile_enter("queue_envelope_walk");
692
	r = handler_envelope_walk(&evpid, evpbuf, sizeof evpbuf);
693
	profile_leave();
694
695
	log_trace(TRACE_QUEUE,
696
	    "queue-backend: queue_envelope_walk() -> %d (%016"PRIx64")",
697
	    r, evpid);
698
699
	if (r == -1)
700
		return (r);
701
702
	if (r && queue_envelope_load_buffer(ep, evpbuf, (size_t)r)) {
703
		if ((e = envelope_validate(ep)) == NULL) {
704
			ep->id = evpid;
705
			if (env->sc_queue_flags & QUEUE_EVPCACHE)
706
				queue_envelope_cache_add(ep);
707
			return (1);
708
		}
709
		log_debug("debug: invalid envelope %016" PRIx64 ": %s",
710
		    ep->id, e);
711
		(void)queue_message_corrupt(evpid_to_msgid(evpid));
712
	}
713
714
	return (0);
715
}
716
717
uint32_t
718
queue_generate_msgid(void)
719
{
720
	uint32_t msgid;
721
722
	while ((msgid = arc4random()) == 0)
723
		;
724
725
	return msgid;
726
}
727
728
uint64_t
729
queue_generate_evpid(uint32_t msgid)
730
{
731
	uint32_t rnd;
732
	uint64_t evpid;
733
734
	while ((rnd = arc4random()) == 0)
735
		;
736
737
	evpid = msgid;
738
	evpid <<= 32;
739
	evpid |= rnd;
740
741
	return evpid;
742
}
743
744
static const char*
745
envelope_validate(struct envelope *ep)
746
{
747
	if (ep->version != SMTPD_ENVELOPE_VERSION)
748
		return "version mismatch";
749
750
	if (memchr(ep->helo, '\0', sizeof(ep->helo)) == NULL)
751
		return "invalid helo";
752
	if (ep->helo[0] == '\0')
753
		return "empty helo";
754
755
	if (memchr(ep->hostname, '\0', sizeof(ep->hostname)) == NULL)
756
		return "invalid hostname";
757
	if (ep->hostname[0] == '\0')
758
		return "empty hostname";
759
760
	if (memchr(ep->errorline, '\0', sizeof(ep->errorline)) == NULL)
761
		return "invalid error line";
762
763
	return NULL;
764
}
765
766
void
767
queue_api_on_close(int(*cb)(void))
768
{
769
	handler_close = cb;
770
}
771
772
void
773
queue_api_on_message_create(int(*cb)(uint32_t *))
774
{
775
	handler_message_create = cb;
776
}
777
778
void
779
queue_api_on_message_commit(int(*cb)(uint32_t, const char *))
780
{
781
	handler_message_commit = cb;
782
}
783
784
void
785
queue_api_on_message_delete(int(*cb)(uint32_t))
786
{
787
	handler_message_delete = cb;
788
}
789
790
void
791
queue_api_on_message_fd_r(int(*cb)(uint32_t))
792
{
793
	handler_message_fd_r = cb;
794
}
795
796
void
797
queue_api_on_message_corrupt(int(*cb)(uint32_t))
798
{
799
	handler_message_corrupt = cb;
800
}
801
802
void
803
queue_api_on_message_uncorrupt(int(*cb)(uint32_t))
804
{
805
	handler_message_uncorrupt = cb;
806
}
807
808
void
809
queue_api_on_envelope_create(int(*cb)(uint32_t, const char *, size_t, uint64_t *))
810
{
811
	handler_envelope_create = cb;
812
}
813
814
void
815
queue_api_on_envelope_delete(int(*cb)(uint64_t))
816
{
817
	handler_envelope_delete = cb;
818
}
819
820
void
821
queue_api_on_envelope_update(int(*cb)(uint64_t, const char *, size_t))
822
{
823
	handler_envelope_update = cb;
824
}
825
826
void
827
queue_api_on_envelope_load(int(*cb)(uint64_t, char *, size_t))
828
{
829
	handler_envelope_load = cb;
830
}
831
832
void
833
queue_api_on_envelope_walk(int(*cb)(uint64_t *, char *, size_t))
834
{
835
	handler_envelope_walk = cb;
836
}
837
838
void
839
queue_api_on_message_walk(int(*cb)(uint64_t *, char *, size_t,
840
    uint32_t, int *, void **))
841
{
842
	handler_message_walk = cb;
843
}