1 |
|
|
/* $OpenBSD: queue_backend.c,v 1.62 2016/02/04 12:46:28 eric Exp $ */ |
2 |
|
|
|
3 |
|
|
/* |
4 |
|
|
* Copyright (c) 2011 Gilles Chehade <gilles@poolp.org> |
5 |
|
|
* |
6 |
|
|
* Permission to use, copy, modify, and distribute this software for any |
7 |
|
|
* purpose with or without fee is hereby granted, provided that the above |
8 |
|
|
* copyright notice and this permission notice appear in all copies. |
9 |
|
|
* |
10 |
|
|
* THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES |
11 |
|
|
* WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF |
12 |
|
|
* MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR |
13 |
|
|
* ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES |
14 |
|
|
* WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN |
15 |
|
|
* ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF |
16 |
|
|
* OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. |
17 |
|
|
*/ |
18 |
|
|
|
19 |
|
|
#include <sys/types.h> |
20 |
|
|
#include <sys/queue.h> |
21 |
|
|
#include <sys/tree.h> |
22 |
|
|
#include <sys/socket.h> |
23 |
|
|
#include <sys/stat.h> |
24 |
|
|
|
25 |
|
|
#include <ctype.h> |
26 |
|
|
#include <err.h> |
27 |
|
|
#include <errno.h> |
28 |
|
|
#include <event.h> |
29 |
|
|
#include <fcntl.h> |
30 |
|
|
#include <grp.h> |
31 |
|
|
#include <imsg.h> |
32 |
|
|
#include <limits.h> |
33 |
|
|
#include <inttypes.h> |
34 |
|
|
#include <libgen.h> |
35 |
|
|
#include <pwd.h> |
36 |
|
|
#include <stdio.h> |
37 |
|
|
#include <stdlib.h> |
38 |
|
|
#include <string.h> |
39 |
|
|
#include <time.h> |
40 |
|
|
#include <unistd.h> |
41 |
|
|
|
42 |
|
|
#include "smtpd.h" |
43 |
|
|
#include "log.h" |
44 |
|
|
|
45 |
|
|
static const char* envelope_validate(struct envelope *); |
46 |
|
|
|
47 |
|
|
extern struct queue_backend queue_backend_fs; |
48 |
|
|
extern struct queue_backend queue_backend_null; |
49 |
|
|
extern struct queue_backend queue_backend_proc; |
50 |
|
|
extern struct queue_backend queue_backend_ram; |
51 |
|
|
|
52 |
|
|
static void queue_envelope_cache_add(struct envelope *); |
53 |
|
|
static void queue_envelope_cache_update(struct envelope *); |
54 |
|
|
static void queue_envelope_cache_del(uint64_t evpid); |
55 |
|
|
|
56 |
|
|
TAILQ_HEAD(evplst, envelope); |
57 |
|
|
|
58 |
|
|
static struct tree evpcache_tree; |
59 |
|
|
static struct evplst evpcache_list; |
60 |
|
|
static struct queue_backend *backend; |
61 |
|
|
|
62 |
|
|
static int (*handler_close)(void); |
63 |
|
|
static int (*handler_message_create)(uint32_t *); |
64 |
|
|
static int (*handler_message_commit)(uint32_t, const char*); |
65 |
|
|
static int (*handler_message_delete)(uint32_t); |
66 |
|
|
static int (*handler_message_fd_r)(uint32_t); |
67 |
|
|
static int (*handler_message_corrupt)(uint32_t); |
68 |
|
|
static int (*handler_message_uncorrupt)(uint32_t); |
69 |
|
|
static int (*handler_envelope_create)(uint32_t, const char *, size_t, uint64_t *); |
70 |
|
|
static int (*handler_envelope_delete)(uint64_t); |
71 |
|
|
static int (*handler_envelope_update)(uint64_t, const char *, size_t); |
72 |
|
|
static int (*handler_envelope_load)(uint64_t, char *, size_t); |
73 |
|
|
static int (*handler_envelope_walk)(uint64_t *, char *, size_t); |
74 |
|
|
static int (*handler_message_walk)(uint64_t *, char *, size_t, |
75 |
|
|
uint32_t, int *, void **); |
76 |
|
|
|
77 |
|
|
#ifdef QUEUE_PROFILING |
78 |
|
|
|
79 |
|
|
static struct { |
80 |
|
|
struct timespec t0; |
81 |
|
|
const char *name; |
82 |
|
|
} profile; |
83 |
|
|
|
84 |
|
|
static inline void profile_enter(const char *name) |
85 |
|
|
{ |
86 |
|
|
if ((profiling & PROFILE_QUEUE) == 0) |
87 |
|
|
return; |
88 |
|
|
|
89 |
|
|
profile.name = name; |
90 |
|
|
clock_gettime(CLOCK_MONOTONIC, &profile.t0); |
91 |
|
|
} |
92 |
|
|
|
93 |
|
|
static inline void profile_leave(void) |
94 |
|
|
{ |
95 |
|
|
struct timespec t1, dt; |
96 |
|
|
|
97 |
|
|
if ((profiling & PROFILE_QUEUE) == 0) |
98 |
|
|
return; |
99 |
|
|
|
100 |
|
|
clock_gettime(CLOCK_MONOTONIC, &t1); |
101 |
|
|
timespecsub(&t1, &profile.t0, &dt); |
102 |
|
|
log_debug("profile-queue: %s %lld.%09ld", profile.name, |
103 |
|
|
(long long)dt.tv_sec, dt.tv_nsec); |
104 |
|
|
} |
105 |
|
|
#else |
106 |
|
|
#define profile_enter(x) do {} while (0) |
107 |
|
|
#define profile_leave() do {} while (0) |
108 |
|
|
#endif |
109 |
|
|
|
110 |
|
|
static int |
111 |
|
|
queue_message_path(uint32_t msgid, char *buf, size_t len) |
112 |
|
|
{ |
113 |
|
|
return bsnprintf(buf, len, "%s/%08"PRIx32, PATH_TEMPORARY, msgid); |
114 |
|
|
} |
115 |
|
|
|
116 |
|
|
int |
117 |
|
|
queue_init(const char *name, int server) |
118 |
|
|
{ |
119 |
|
|
struct passwd *pwq; |
120 |
|
|
struct group *gr; |
121 |
|
|
int r; |
122 |
|
|
|
123 |
|
|
pwq = getpwnam(SMTPD_QUEUE_USER); |
124 |
|
|
if (pwq == NULL) |
125 |
|
|
errx(1, "unknown user %s", SMTPD_QUEUE_USER); |
126 |
|
|
|
127 |
|
|
gr = getgrnam(SMTPD_QUEUE_GROUP); |
128 |
|
|
if (gr == NULL) |
129 |
|
|
errx(1, "unknown group %s", SMTPD_QUEUE_GROUP); |
130 |
|
|
|
131 |
|
|
tree_init(&evpcache_tree); |
132 |
|
|
TAILQ_INIT(&evpcache_list); |
133 |
|
|
|
134 |
|
|
if (!strcmp(name, "fs")) |
135 |
|
|
backend = &queue_backend_fs; |
136 |
|
|
else if (!strcmp(name, "null")) |
137 |
|
|
backend = &queue_backend_null; |
138 |
|
|
else if (!strcmp(name, "ram")) |
139 |
|
|
backend = &queue_backend_ram; |
140 |
|
|
else |
141 |
|
|
backend = &queue_backend_proc; |
142 |
|
|
|
143 |
|
|
if (server) { |
144 |
|
|
if (ckdir(PATH_SPOOL, 0711, 0, 0, 1) == 0) |
145 |
|
|
errx(1, "error in spool directory setup"); |
146 |
|
|
if (ckdir(PATH_SPOOL PATH_OFFLINE, 0770, 0, gr->gr_gid, 1) == 0) |
147 |
|
|
errx(1, "error in offline directory setup"); |
148 |
|
|
if (ckdir(PATH_SPOOL PATH_PURGE, 0700, pwq->pw_uid, 0, 1) == 0) |
149 |
|
|
errx(1, "error in purge directory setup"); |
150 |
|
|
|
151 |
|
|
mvpurge(PATH_SPOOL PATH_TEMPORARY, PATH_SPOOL PATH_PURGE); |
152 |
|
|
|
153 |
|
|
if (ckdir(PATH_SPOOL PATH_TEMPORARY, 0700, pwq->pw_uid, 0, 1) == 0) |
154 |
|
|
errx(1, "error in purge directory setup"); |
155 |
|
|
} |
156 |
|
|
|
157 |
|
|
r = backend->init(pwq, server, name); |
158 |
|
|
|
159 |
|
|
log_trace(TRACE_QUEUE, "queue-backend: queue_init(%d) -> %d", server, r); |
160 |
|
|
|
161 |
|
|
return (r); |
162 |
|
|
} |
163 |
|
|
|
164 |
|
|
int |
165 |
|
|
queue_close(void) |
166 |
|
|
{ |
167 |
|
|
if (handler_close) |
168 |
|
|
return (handler_close()); |
169 |
|
|
|
170 |
|
|
return (1); |
171 |
|
|
} |
172 |
|
|
|
173 |
|
|
int |
174 |
|
|
queue_message_create(uint32_t *msgid) |
175 |
|
|
{ |
176 |
|
|
int r; |
177 |
|
|
|
178 |
|
|
profile_enter("queue_message_create"); |
179 |
|
|
r = handler_message_create(msgid); |
180 |
|
|
profile_leave(); |
181 |
|
|
|
182 |
|
|
log_trace(TRACE_QUEUE, |
183 |
|
|
"queue-backend: queue_message_create() -> %d (%08"PRIx32")", |
184 |
|
|
r, *msgid); |
185 |
|
|
|
186 |
|
|
return (r); |
187 |
|
|
} |
188 |
|
|
|
189 |
|
|
int |
190 |
|
|
queue_message_delete(uint32_t msgid) |
191 |
|
|
{ |
192 |
|
|
char msgpath[PATH_MAX]; |
193 |
|
|
uint64_t evpid; |
194 |
|
|
void *iter; |
195 |
|
|
int r; |
196 |
|
|
|
197 |
|
|
profile_enter("queue_message_delete"); |
198 |
|
|
r = handler_message_delete(msgid); |
199 |
|
|
profile_leave(); |
200 |
|
|
|
201 |
|
|
/* in case the message is incoming */ |
202 |
|
|
queue_message_path(msgid, msgpath, sizeof(msgpath)); |
203 |
|
|
unlink(msgpath); |
204 |
|
|
|
205 |
|
|
/* remove remaining envelopes from the cache if any (on rollback) */ |
206 |
|
|
evpid = msgid_to_evpid(msgid); |
207 |
|
|
for (;;) { |
208 |
|
|
iter = NULL; |
209 |
|
|
if (!tree_iterfrom(&evpcache_tree, &iter, evpid, &evpid, NULL)) |
210 |
|
|
break; |
211 |
|
|
if (evpid_to_msgid(evpid) != msgid) |
212 |
|
|
break; |
213 |
|
|
queue_envelope_cache_del(evpid); |
214 |
|
|
} |
215 |
|
|
|
216 |
|
|
log_trace(TRACE_QUEUE, |
217 |
|
|
"queue-backend: queue_message_delete(%08"PRIx32") -> %d", msgid, r); |
218 |
|
|
|
219 |
|
|
return (r); |
220 |
|
|
} |
221 |
|
|
|
222 |
|
|
int |
223 |
|
|
queue_message_commit(uint32_t msgid) |
224 |
|
|
{ |
225 |
|
|
int r; |
226 |
|
|
char msgpath[PATH_MAX]; |
227 |
|
|
char tmppath[PATH_MAX]; |
228 |
|
|
FILE *ifp = NULL; |
229 |
|
|
FILE *ofp = NULL; |
230 |
|
|
|
231 |
|
|
profile_enter("queue_message_commit"); |
232 |
|
|
|
233 |
|
|
queue_message_path(msgid, msgpath, sizeof(msgpath)); |
234 |
|
|
|
235 |
|
|
if (env->sc_queue_flags & QUEUE_COMPRESSION) { |
236 |
|
|
bsnprintf(tmppath, sizeof tmppath, "%s.comp", msgpath); |
237 |
|
|
ifp = fopen(msgpath, "r"); |
238 |
|
|
ofp = fopen(tmppath, "w+"); |
239 |
|
|
if (ifp == NULL || ofp == NULL) |
240 |
|
|
goto err; |
241 |
|
|
if (!compress_file(ifp, ofp)) |
242 |
|
|
goto err; |
243 |
|
|
fclose(ifp); |
244 |
|
|
fclose(ofp); |
245 |
|
|
ifp = NULL; |
246 |
|
|
ofp = NULL; |
247 |
|
|
|
248 |
|
|
if (rename(tmppath, msgpath) == -1) { |
249 |
|
|
if (errno == ENOSPC) |
250 |
|
|
return (0); |
251 |
|
|
unlink(tmppath); |
252 |
|
|
log_warn("rename"); |
253 |
|
|
return (0); |
254 |
|
|
} |
255 |
|
|
} |
256 |
|
|
|
257 |
|
|
if (env->sc_queue_flags & QUEUE_ENCRYPTION) { |
258 |
|
|
bsnprintf(tmppath, sizeof tmppath, "%s.enc", msgpath); |
259 |
|
|
ifp = fopen(msgpath, "r"); |
260 |
|
|
ofp = fopen(tmppath, "w+"); |
261 |
|
|
if (ifp == NULL || ofp == NULL) |
262 |
|
|
goto err; |
263 |
|
|
if (!crypto_encrypt_file(ifp, ofp)) |
264 |
|
|
goto err; |
265 |
|
|
fclose(ifp); |
266 |
|
|
fclose(ofp); |
267 |
|
|
ifp = NULL; |
268 |
|
|
ofp = NULL; |
269 |
|
|
|
270 |
|
|
if (rename(tmppath, msgpath) == -1) { |
271 |
|
|
if (errno == ENOSPC) |
272 |
|
|
return (0); |
273 |
|
|
unlink(tmppath); |
274 |
|
|
log_warn("rename"); |
275 |
|
|
return (0); |
276 |
|
|
} |
277 |
|
|
} |
278 |
|
|
|
279 |
|
|
r = handler_message_commit(msgid, msgpath); |
280 |
|
|
profile_leave(); |
281 |
|
|
|
282 |
|
|
/* in case it's not done by the backend */ |
283 |
|
|
unlink(msgpath); |
284 |
|
|
|
285 |
|
|
log_trace(TRACE_QUEUE, |
286 |
|
|
"queue-backend: queue_message_commit(%08"PRIx32") -> %d", |
287 |
|
|
msgid, r); |
288 |
|
|
|
289 |
|
|
return (r); |
290 |
|
|
|
291 |
|
|
err: |
292 |
|
|
if (ifp) |
293 |
|
|
fclose(ifp); |
294 |
|
|
if (ofp) |
295 |
|
|
fclose(ofp); |
296 |
|
|
return 0; |
297 |
|
|
} |
298 |
|
|
|
299 |
|
|
int |
300 |
|
|
queue_message_corrupt(uint32_t msgid) |
301 |
|
|
{ |
302 |
|
|
int r; |
303 |
|
|
|
304 |
|
|
profile_enter("queue_message_corrupt"); |
305 |
|
|
r = handler_message_corrupt(msgid); |
306 |
|
|
profile_leave(); |
307 |
|
|
|
308 |
|
|
log_trace(TRACE_QUEUE, |
309 |
|
|
"queue-backend: queue_message_corrupt(%08"PRIx32") -> %d", msgid, r); |
310 |
|
|
|
311 |
|
|
return (r); |
312 |
|
|
} |
313 |
|
|
|
314 |
|
|
int |
315 |
|
|
queue_message_uncorrupt(uint32_t msgid) |
316 |
|
|
{ |
317 |
|
|
return handler_message_uncorrupt(msgid); |
318 |
|
|
} |
319 |
|
|
|
320 |
|
|
int |
321 |
|
|
queue_message_fd_r(uint32_t msgid) |
322 |
|
|
{ |
323 |
|
|
int fdin = -1, fdout = -1, fd = -1; |
324 |
|
|
FILE *ifp = NULL; |
325 |
|
|
FILE *ofp = NULL; |
326 |
|
|
|
327 |
|
|
profile_enter("queue_message_fd_r"); |
328 |
|
|
fdin = handler_message_fd_r(msgid); |
329 |
|
|
profile_leave(); |
330 |
|
|
|
331 |
|
|
log_trace(TRACE_QUEUE, |
332 |
|
|
"queue-backend: queue_message_fd_r(%08"PRIx32") -> %d", msgid, fdin); |
333 |
|
|
|
334 |
|
|
if (fdin == -1) |
335 |
|
|
return (-1); |
336 |
|
|
|
337 |
|
|
if (env->sc_queue_flags & QUEUE_ENCRYPTION) { |
338 |
|
|
if ((fdout = mktmpfile()) == -1) |
339 |
|
|
goto err; |
340 |
|
|
if ((fd = dup(fdout)) == -1) |
341 |
|
|
goto err; |
342 |
|
|
if ((ifp = fdopen(fdin, "r")) == NULL) |
343 |
|
|
goto err; |
344 |
|
|
fdin = fd; |
345 |
|
|
fd = -1; |
346 |
|
|
if ((ofp = fdopen(fdout, "w+")) == NULL) |
347 |
|
|
goto err; |
348 |
|
|
|
349 |
|
|
if (!crypto_decrypt_file(ifp, ofp)) |
350 |
|
|
goto err; |
351 |
|
|
|
352 |
|
|
fclose(ifp); |
353 |
|
|
ifp = NULL; |
354 |
|
|
fclose(ofp); |
355 |
|
|
ofp = NULL; |
356 |
|
|
lseek(fdin, SEEK_SET, 0); |
357 |
|
|
} |
358 |
|
|
|
359 |
|
|
if (env->sc_queue_flags & QUEUE_COMPRESSION) { |
360 |
|
|
if ((fdout = mktmpfile()) == -1) |
361 |
|
|
goto err; |
362 |
|
|
if ((fd = dup(fdout)) == -1) |
363 |
|
|
goto err; |
364 |
|
|
if ((ifp = fdopen(fdin, "r")) == NULL) |
365 |
|
|
goto err; |
366 |
|
|
fdin = fd; |
367 |
|
|
fd = -1; |
368 |
|
|
if ((ofp = fdopen(fdout, "w+")) == NULL) |
369 |
|
|
goto err; |
370 |
|
|
|
371 |
|
|
if (!uncompress_file(ifp, ofp)) |
372 |
|
|
goto err; |
373 |
|
|
|
374 |
|
|
fclose(ifp); |
375 |
|
|
ifp = NULL; |
376 |
|
|
fclose(ofp); |
377 |
|
|
ofp = NULL; |
378 |
|
|
lseek(fdin, SEEK_SET, 0); |
379 |
|
|
} |
380 |
|
|
|
381 |
|
|
return (fdin); |
382 |
|
|
|
383 |
|
|
err: |
384 |
|
|
if (fd != -1) |
385 |
|
|
close(fd); |
386 |
|
|
if (fdin != -1) |
387 |
|
|
close(fdin); |
388 |
|
|
if (fdout != -1) |
389 |
|
|
close(fdout); |
390 |
|
|
if (ifp) |
391 |
|
|
fclose(ifp); |
392 |
|
|
if (ofp) |
393 |
|
|
fclose(ofp); |
394 |
|
|
return -1; |
395 |
|
|
} |
396 |
|
|
|
397 |
|
|
int |
398 |
|
|
queue_message_fd_rw(uint32_t msgid) |
399 |
|
|
{ |
400 |
|
|
char buf[PATH_MAX]; |
401 |
|
|
|
402 |
|
|
queue_message_path(msgid, buf, sizeof(buf)); |
403 |
|
|
|
404 |
|
|
return open(buf, O_RDWR | O_CREAT | O_EXCL, 0600); |
405 |
|
|
} |
406 |
|
|
|
407 |
|
|
static int |
408 |
|
|
queue_envelope_dump_buffer(struct envelope *ep, char *evpbuf, size_t evpbufsize) |
409 |
|
|
{ |
410 |
|
|
char *evp; |
411 |
|
|
size_t evplen; |
412 |
|
|
size_t complen; |
413 |
|
|
char compbuf[sizeof(struct envelope)]; |
414 |
|
|
size_t enclen; |
415 |
|
|
char encbuf[sizeof(struct envelope)]; |
416 |
|
|
|
417 |
|
|
evp = evpbuf; |
418 |
|
|
evplen = envelope_dump_buffer(ep, evpbuf, evpbufsize); |
419 |
|
|
if (evplen == 0) |
420 |
|
|
return (0); |
421 |
|
|
|
422 |
|
|
if (env->sc_queue_flags & QUEUE_COMPRESSION) { |
423 |
|
|
complen = compress_chunk(evp, evplen, compbuf, sizeof compbuf); |
424 |
|
|
if (complen == 0) |
425 |
|
|
return (0); |
426 |
|
|
evp = compbuf; |
427 |
|
|
evplen = complen; |
428 |
|
|
} |
429 |
|
|
|
430 |
|
|
if (env->sc_queue_flags & QUEUE_ENCRYPTION) { |
431 |
|
|
enclen = crypto_encrypt_buffer(evp, evplen, encbuf, sizeof encbuf); |
432 |
|
|
if (enclen == 0) |
433 |
|
|
return (0); |
434 |
|
|
evp = encbuf; |
435 |
|
|
evplen = enclen; |
436 |
|
|
} |
437 |
|
|
|
438 |
|
|
memmove(evpbuf, evp, evplen); |
439 |
|
|
|
440 |
|
|
return (evplen); |
441 |
|
|
} |
442 |
|
|
|
443 |
|
|
static int |
444 |
|
|
queue_envelope_load_buffer(struct envelope *ep, char *evpbuf, size_t evpbufsize) |
445 |
|
|
{ |
446 |
|
|
char *evp; |
447 |
|
|
size_t evplen; |
448 |
|
|
char compbuf[sizeof(struct envelope)]; |
449 |
|
|
size_t complen; |
450 |
|
|
char encbuf[sizeof(struct envelope)]; |
451 |
|
|
size_t enclen; |
452 |
|
|
|
453 |
|
|
evp = evpbuf; |
454 |
|
|
evplen = evpbufsize; |
455 |
|
|
|
456 |
|
|
if (env->sc_queue_flags & QUEUE_ENCRYPTION) { |
457 |
|
|
enclen = crypto_decrypt_buffer(evp, evplen, encbuf, sizeof encbuf); |
458 |
|
|
if (enclen == 0) |
459 |
|
|
return (0); |
460 |
|
|
evp = encbuf; |
461 |
|
|
evplen = enclen; |
462 |
|
|
} |
463 |
|
|
|
464 |
|
|
if (env->sc_queue_flags & QUEUE_COMPRESSION) { |
465 |
|
|
complen = uncompress_chunk(evp, evplen, compbuf, sizeof compbuf); |
466 |
|
|
if (complen == 0) |
467 |
|
|
return (0); |
468 |
|
|
evp = compbuf; |
469 |
|
|
evplen = complen; |
470 |
|
|
} |
471 |
|
|
|
472 |
|
|
return (envelope_load_buffer(ep, evp, evplen)); |
473 |
|
|
} |
474 |
|
|
|
475 |
|
|
static void |
476 |
|
|
queue_envelope_cache_add(struct envelope *e) |
477 |
|
|
{ |
478 |
|
|
struct envelope *cached; |
479 |
|
|
|
480 |
|
|
while (tree_count(&evpcache_tree) >= env->sc_queue_evpcache_size) |
481 |
|
|
queue_envelope_cache_del(TAILQ_LAST(&evpcache_list, evplst)->id); |
482 |
|
|
|
483 |
|
|
cached = xcalloc(1, sizeof *cached, "queue_envelope_cache_add"); |
484 |
|
|
*cached = *e; |
485 |
|
|
TAILQ_INSERT_HEAD(&evpcache_list, cached, entry); |
486 |
|
|
tree_xset(&evpcache_tree, e->id, cached); |
487 |
|
|
stat_increment("queue.evpcache.size", 1); |
488 |
|
|
} |
489 |
|
|
|
490 |
|
|
static void |
491 |
|
|
queue_envelope_cache_update(struct envelope *e) |
492 |
|
|
{ |
493 |
|
|
struct envelope *cached; |
494 |
|
|
|
495 |
|
|
if ((cached = tree_get(&evpcache_tree, e->id)) == NULL) { |
496 |
|
|
queue_envelope_cache_add(e); |
497 |
|
|
stat_increment("queue.evpcache.update.missed", 1); |
498 |
|
|
} else { |
499 |
|
|
TAILQ_REMOVE(&evpcache_list, cached, entry); |
500 |
|
|
*cached = *e; |
501 |
|
|
TAILQ_INSERT_HEAD(&evpcache_list, cached, entry); |
502 |
|
|
stat_increment("queue.evpcache.update.hit", 1); |
503 |
|
|
} |
504 |
|
|
} |
505 |
|
|
|
506 |
|
|
static void |
507 |
|
|
queue_envelope_cache_del(uint64_t evpid) |
508 |
|
|
{ |
509 |
|
|
struct envelope *cached; |
510 |
|
|
|
511 |
|
|
if ((cached = tree_pop(&evpcache_tree, evpid)) == NULL) |
512 |
|
|
return; |
513 |
|
|
|
514 |
|
|
TAILQ_REMOVE(&evpcache_list, cached, entry); |
515 |
|
|
free(cached); |
516 |
|
|
stat_decrement("queue.evpcache.size", 1); |
517 |
|
|
} |
518 |
|
|
|
519 |
|
|
int |
520 |
|
|
queue_envelope_create(struct envelope *ep) |
521 |
|
|
{ |
522 |
|
|
int r; |
523 |
|
|
char evpbuf[sizeof(struct envelope)]; |
524 |
|
|
size_t evplen; |
525 |
|
|
uint64_t evpid; |
526 |
|
|
uint32_t msgid; |
527 |
|
|
|
528 |
|
|
ep->creation = time(NULL); |
529 |
|
|
evplen = queue_envelope_dump_buffer(ep, evpbuf, sizeof evpbuf); |
530 |
|
|
if (evplen == 0) |
531 |
|
|
return (0); |
532 |
|
|
|
533 |
|
|
evpid = ep->id; |
534 |
|
|
msgid = evpid_to_msgid(evpid); |
535 |
|
|
|
536 |
|
|
profile_enter("queue_envelope_create"); |
537 |
|
|
r = handler_envelope_create(msgid, evpbuf, evplen, &ep->id); |
538 |
|
|
profile_leave(); |
539 |
|
|
|
540 |
|
|
log_trace(TRACE_QUEUE, |
541 |
|
|
"queue-backend: queue_envelope_create(%016"PRIx64", %zu) -> %d (%016"PRIx64")", |
542 |
|
|
evpid, evplen, r, ep->id); |
543 |
|
|
|
544 |
|
|
if (!r) { |
545 |
|
|
ep->creation = 0; |
546 |
|
|
ep->id = 0; |
547 |
|
|
} |
548 |
|
|
|
549 |
|
|
if (r && env->sc_queue_flags & QUEUE_EVPCACHE) |
550 |
|
|
queue_envelope_cache_add(ep); |
551 |
|
|
|
552 |
|
|
return (r); |
553 |
|
|
} |
554 |
|
|
|
555 |
|
|
int |
556 |
|
|
queue_envelope_delete(uint64_t evpid) |
557 |
|
|
{ |
558 |
|
|
int r; |
559 |
|
|
|
560 |
|
|
if (env->sc_queue_flags & QUEUE_EVPCACHE) |
561 |
|
|
queue_envelope_cache_del(evpid); |
562 |
|
|
|
563 |
|
|
profile_enter("queue_envelope_delete"); |
564 |
|
|
r = handler_envelope_delete(evpid); |
565 |
|
|
profile_leave(); |
566 |
|
|
|
567 |
|
|
log_trace(TRACE_QUEUE, |
568 |
|
|
"queue-backend: queue_envelope_delete(%016"PRIx64") -> %d", |
569 |
|
|
evpid, r); |
570 |
|
|
|
571 |
|
|
return (r); |
572 |
|
|
} |
573 |
|
|
|
574 |
|
|
int |
575 |
|
|
queue_envelope_load(uint64_t evpid, struct envelope *ep) |
576 |
|
|
{ |
577 |
|
|
const char *e; |
578 |
|
|
char evpbuf[sizeof(struct envelope)]; |
579 |
|
|
size_t evplen; |
580 |
|
|
struct envelope *cached; |
581 |
|
|
|
582 |
|
|
if ((env->sc_queue_flags & QUEUE_EVPCACHE) && |
583 |
|
|
(cached = tree_get(&evpcache_tree, evpid))) { |
584 |
|
|
*ep = *cached; |
585 |
|
|
stat_increment("queue.evpcache.load.hit", 1); |
586 |
|
|
return (1); |
587 |
|
|
} |
588 |
|
|
|
589 |
|
|
ep->id = evpid; |
590 |
|
|
profile_enter("queue_envelope_load"); |
591 |
|
|
evplen = handler_envelope_load(ep->id, evpbuf, sizeof evpbuf); |
592 |
|
|
profile_leave(); |
593 |
|
|
|
594 |
|
|
log_trace(TRACE_QUEUE, |
595 |
|
|
"queue-backend: queue_envelope_load(%016"PRIx64") -> %zu", |
596 |
|
|
evpid, evplen); |
597 |
|
|
|
598 |
|
|
if (evplen == 0) |
599 |
|
|
return (0); |
600 |
|
|
|
601 |
|
|
if (queue_envelope_load_buffer(ep, evpbuf, evplen)) { |
602 |
|
|
if ((e = envelope_validate(ep)) == NULL) { |
603 |
|
|
ep->id = evpid; |
604 |
|
|
if (env->sc_queue_flags & QUEUE_EVPCACHE) { |
605 |
|
|
queue_envelope_cache_add(ep); |
606 |
|
|
stat_increment("queue.evpcache.load.missed", 1); |
607 |
|
|
} |
608 |
|
|
return (1); |
609 |
|
|
} |
610 |
|
|
log_debug("debug: invalid envelope %016" PRIx64 ": %s", |
611 |
|
|
ep->id, e); |
612 |
|
|
} |
613 |
|
|
|
614 |
|
|
(void)queue_message_corrupt(evpid_to_msgid(evpid)); |
615 |
|
|
return (0); |
616 |
|
|
} |
617 |
|
|
|
618 |
|
|
int |
619 |
|
|
queue_envelope_update(struct envelope *ep) |
620 |
|
|
{ |
621 |
|
|
char evpbuf[sizeof(struct envelope)]; |
622 |
|
|
size_t evplen; |
623 |
|
|
int r; |
624 |
|
|
|
625 |
|
|
evplen = queue_envelope_dump_buffer(ep, evpbuf, sizeof evpbuf); |
626 |
|
|
if (evplen == 0) |
627 |
|
|
return (0); |
628 |
|
|
|
629 |
|
|
profile_enter("queue_envelope_update"); |
630 |
|
|
r = handler_envelope_update(ep->id, evpbuf, evplen); |
631 |
|
|
profile_leave(); |
632 |
|
|
|
633 |
|
|
if (r && env->sc_queue_flags & QUEUE_EVPCACHE) |
634 |
|
|
queue_envelope_cache_update(ep); |
635 |
|
|
|
636 |
|
|
log_trace(TRACE_QUEUE, |
637 |
|
|
"queue-backend: queue_envelope_update(%016"PRIx64") -> %d", |
638 |
|
|
ep->id, r); |
639 |
|
|
|
640 |
|
|
return (r); |
641 |
|
|
} |
642 |
|
|
|
643 |
|
|
int |
644 |
|
|
queue_message_walk(struct envelope *ep, uint32_t msgid, int *done, void **data) |
645 |
|
|
{ |
646 |
|
|
char evpbuf[sizeof(struct envelope)]; |
647 |
|
|
uint64_t evpid; |
648 |
|
|
int r; |
649 |
|
|
const char *e; |
650 |
|
|
|
651 |
|
|
profile_enter("queue_message_walk"); |
652 |
|
|
r = handler_message_walk(&evpid, evpbuf, sizeof evpbuf, |
653 |
|
|
msgid, done, data); |
654 |
|
|
profile_leave(); |
655 |
|
|
|
656 |
|
|
log_trace(TRACE_QUEUE, |
657 |
|
|
"queue-backend: queue_message_walk() -> %d (%016"PRIx64")", |
658 |
|
|
r, evpid); |
659 |
|
|
|
660 |
|
|
if (r == -1) |
661 |
|
|
return (r); |
662 |
|
|
|
663 |
|
|
if (r && queue_envelope_load_buffer(ep, evpbuf, (size_t)r)) { |
664 |
|
|
if ((e = envelope_validate(ep)) == NULL) { |
665 |
|
|
ep->id = evpid; |
666 |
|
|
/* |
667 |
|
|
* do not cache the envelope here, while discovering |
668 |
|
|
* envelopes one could re-run discover on already |
669 |
|
|
* scheduled envelopes which leads to triggering of |
670 |
|
|
* strict checks in caching. Envelopes could anyway |
671 |
|
|
* be loaded from backend if it isn't cached. |
672 |
|
|
*/ |
673 |
|
|
return (1); |
674 |
|
|
} |
675 |
|
|
log_debug("debug: invalid envelope %016" PRIx64 ": %s", |
676 |
|
|
ep->id, e); |
677 |
|
|
(void)queue_message_corrupt(evpid_to_msgid(evpid)); |
678 |
|
|
} |
679 |
|
|
|
680 |
|
|
return (0); |
681 |
|
|
} |
682 |
|
|
|
683 |
|
|
int |
684 |
|
|
queue_envelope_walk(struct envelope *ep) |
685 |
|
|
{ |
686 |
|
|
const char *e; |
687 |
|
|
uint64_t evpid; |
688 |
|
|
char evpbuf[sizeof(struct envelope)]; |
689 |
|
|
int r; |
690 |
|
|
|
691 |
|
|
profile_enter("queue_envelope_walk"); |
692 |
|
|
r = handler_envelope_walk(&evpid, evpbuf, sizeof evpbuf); |
693 |
|
|
profile_leave(); |
694 |
|
|
|
695 |
|
|
log_trace(TRACE_QUEUE, |
696 |
|
|
"queue-backend: queue_envelope_walk() -> %d (%016"PRIx64")", |
697 |
|
|
r, evpid); |
698 |
|
|
|
699 |
|
|
if (r == -1) |
700 |
|
|
return (r); |
701 |
|
|
|
702 |
|
|
if (r && queue_envelope_load_buffer(ep, evpbuf, (size_t)r)) { |
703 |
|
|
if ((e = envelope_validate(ep)) == NULL) { |
704 |
|
|
ep->id = evpid; |
705 |
|
|
if (env->sc_queue_flags & QUEUE_EVPCACHE) |
706 |
|
|
queue_envelope_cache_add(ep); |
707 |
|
|
return (1); |
708 |
|
|
} |
709 |
|
|
log_debug("debug: invalid envelope %016" PRIx64 ": %s", |
710 |
|
|
ep->id, e); |
711 |
|
|
(void)queue_message_corrupt(evpid_to_msgid(evpid)); |
712 |
|
|
} |
713 |
|
|
|
714 |
|
|
return (0); |
715 |
|
|
} |
716 |
|
|
|
717 |
|
|
uint32_t |
718 |
|
|
queue_generate_msgid(void) |
719 |
|
|
{ |
720 |
|
|
uint32_t msgid; |
721 |
|
|
|
722 |
|
|
while ((msgid = arc4random()) == 0) |
723 |
|
|
; |
724 |
|
|
|
725 |
|
|
return msgid; |
726 |
|
|
} |
727 |
|
|
|
728 |
|
|
uint64_t |
729 |
|
|
queue_generate_evpid(uint32_t msgid) |
730 |
|
|
{ |
731 |
|
|
uint32_t rnd; |
732 |
|
|
uint64_t evpid; |
733 |
|
|
|
734 |
|
|
while ((rnd = arc4random()) == 0) |
735 |
|
|
; |
736 |
|
|
|
737 |
|
|
evpid = msgid; |
738 |
|
|
evpid <<= 32; |
739 |
|
|
evpid |= rnd; |
740 |
|
|
|
741 |
|
|
return evpid; |
742 |
|
|
} |
743 |
|
|
|
744 |
|
|
static const char* |
745 |
|
|
envelope_validate(struct envelope *ep) |
746 |
|
|
{ |
747 |
|
|
if (ep->version != SMTPD_ENVELOPE_VERSION) |
748 |
|
|
return "version mismatch"; |
749 |
|
|
|
750 |
|
|
if (memchr(ep->helo, '\0', sizeof(ep->helo)) == NULL) |
751 |
|
|
return "invalid helo"; |
752 |
|
|
if (ep->helo[0] == '\0') |
753 |
|
|
return "empty helo"; |
754 |
|
|
|
755 |
|
|
if (memchr(ep->hostname, '\0', sizeof(ep->hostname)) == NULL) |
756 |
|
|
return "invalid hostname"; |
757 |
|
|
if (ep->hostname[0] == '\0') |
758 |
|
|
return "empty hostname"; |
759 |
|
|
|
760 |
|
|
if (memchr(ep->errorline, '\0', sizeof(ep->errorline)) == NULL) |
761 |
|
|
return "invalid error line"; |
762 |
|
|
|
763 |
|
|
return NULL; |
764 |
|
|
} |
765 |
|
|
|
766 |
|
|
void |
767 |
|
|
queue_api_on_close(int(*cb)(void)) |
768 |
|
|
{ |
769 |
|
|
handler_close = cb; |
770 |
|
|
} |
771 |
|
|
|
772 |
|
|
void |
773 |
|
|
queue_api_on_message_create(int(*cb)(uint32_t *)) |
774 |
|
|
{ |
775 |
|
|
handler_message_create = cb; |
776 |
|
|
} |
777 |
|
|
|
778 |
|
|
void |
779 |
|
|
queue_api_on_message_commit(int(*cb)(uint32_t, const char *)) |
780 |
|
|
{ |
781 |
|
|
handler_message_commit = cb; |
782 |
|
|
} |
783 |
|
|
|
784 |
|
|
void |
785 |
|
|
queue_api_on_message_delete(int(*cb)(uint32_t)) |
786 |
|
|
{ |
787 |
|
|
handler_message_delete = cb; |
788 |
|
|
} |
789 |
|
|
|
790 |
|
|
void |
791 |
|
|
queue_api_on_message_fd_r(int(*cb)(uint32_t)) |
792 |
|
|
{ |
793 |
|
|
handler_message_fd_r = cb; |
794 |
|
|
} |
795 |
|
|
|
796 |
|
|
void |
797 |
|
|
queue_api_on_message_corrupt(int(*cb)(uint32_t)) |
798 |
|
|
{ |
799 |
|
|
handler_message_corrupt = cb; |
800 |
|
|
} |
801 |
|
|
|
802 |
|
|
void |
803 |
|
|
queue_api_on_message_uncorrupt(int(*cb)(uint32_t)) |
804 |
|
|
{ |
805 |
|
|
handler_message_uncorrupt = cb; |
806 |
|
|
} |
807 |
|
|
|
808 |
|
|
void |
809 |
|
|
queue_api_on_envelope_create(int(*cb)(uint32_t, const char *, size_t, uint64_t *)) |
810 |
|
|
{ |
811 |
|
|
handler_envelope_create = cb; |
812 |
|
|
} |
813 |
|
|
|
814 |
|
|
void |
815 |
|
|
queue_api_on_envelope_delete(int(*cb)(uint64_t)) |
816 |
|
|
{ |
817 |
|
|
handler_envelope_delete = cb; |
818 |
|
|
} |
819 |
|
|
|
820 |
|
|
void |
821 |
|
|
queue_api_on_envelope_update(int(*cb)(uint64_t, const char *, size_t)) |
822 |
|
|
{ |
823 |
|
|
handler_envelope_update = cb; |
824 |
|
|
} |
825 |
|
|
|
826 |
|
|
void |
827 |
|
|
queue_api_on_envelope_load(int(*cb)(uint64_t, char *, size_t)) |
828 |
|
|
{ |
829 |
|
|
handler_envelope_load = cb; |
830 |
|
|
} |
831 |
|
|
|
832 |
|
|
void |
833 |
|
|
queue_api_on_envelope_walk(int(*cb)(uint64_t *, char *, size_t)) |
834 |
|
|
{ |
835 |
|
|
handler_envelope_walk = cb; |
836 |
|
|
} |
837 |
|
|
|
838 |
|
|
void |
839 |
|
|
queue_api_on_message_walk(int(*cb)(uint64_t *, char *, size_t, |
840 |
|
|
uint32_t, int *, void **)) |
841 |
|
|
{ |
842 |
|
|
handler_message_walk = cb; |
843 |
|
|
} |