1package queue
2
3import (
4 "bufio"
5 "context"
6 "crypto/ed25519"
7 cryptorand "crypto/rand"
8 "crypto/sha256"
9 "crypto/tls"
10 "crypto/x509"
11 "fmt"
12 "io"
13 "math/big"
14 "net"
15 "os"
16 "path/filepath"
17 "reflect"
18 "strings"
19 "testing"
20 "time"
21
22 "github.com/mjl-/adns"
23 "github.com/mjl-/bstore"
24
25 "github.com/mjl-/mox/dns"
26 "github.com/mjl-/mox/mlog"
27 "github.com/mjl-/mox/mox-"
28 "github.com/mjl-/mox/smtp"
29 "github.com/mjl-/mox/smtpclient"
30 "github.com/mjl-/mox/store"
31 "github.com/mjl-/mox/tlsrpt"
32 "github.com/mjl-/mox/tlsrptdb"
33)
34
35var ctxbg = context.Background()
36var pkglog = mlog.New("queue", nil)
37
38func tcheck(t *testing.T, err error, msg string) {
39 if err != nil {
40 t.Helper()
41 t.Fatalf("%s: %s", msg, err)
42 }
43}
44
45func tcompare(t *testing.T, got, exp any) {
46 t.Helper()
47 if !reflect.DeepEqual(got, exp) {
48 t.Fatalf("got %v, expected %v", got, exp)
49 }
50}
51
52var keepAccount bool
53
54func setup(t *testing.T) (*store.Account, func()) {
55 // Prepare config so email can be delivered to mjl@mox.example.
56
57 // Don't trigger the account consistency checks. Only remove account files on first
58 // (of randomized) runs.
59 if !keepAccount {
60 os.RemoveAll("../testdata/queue/data")
61 keepAccount = true
62 } else {
63 os.RemoveAll("../testdata/queue/data/queue")
64 }
65
66 log := mlog.New("queue", nil)
67 mox.Context = ctxbg
68 mox.ConfigStaticPath = filepath.FromSlash("../testdata/queue/mox.conf")
69 mox.MustLoadConfig(true, false)
70 acc, err := store.OpenAccount(log, "mjl")
71 tcheck(t, err, "open account")
72 err = acc.SetPassword(log, "testtest")
73 tcheck(t, err, "set password")
74 switchStop := store.Switchboard()
75 mox.Shutdown, mox.ShutdownCancel = context.WithCancel(ctxbg)
76 return acc, func() {
77 acc.Close()
78 mox.ShutdownCancel()
79 mox.Shutdown, mox.ShutdownCancel = context.WithCancel(ctxbg)
80 Shutdown()
81 switchStop()
82 }
83}
84
85var testmsg = strings.ReplaceAll(`From: <mjl@mox.example>
86To: <mjl@mox.example>
87Subject: test
88
89test email
90`, "\n", "\r\n")
91
92func prepareFile(t *testing.T) *os.File {
93 t.Helper()
94 msgFile, err := store.CreateMessageTemp(pkglog, "queue")
95 tcheck(t, err, "create temp message for delivery to queue")
96 _, err = msgFile.Write([]byte(testmsg))
97 tcheck(t, err, "write message file")
98 return msgFile
99}
100
101func TestQueue(t *testing.T) {
102 acc, cleanup := setup(t)
103 defer cleanup()
104 err := Init()
105 tcheck(t, err, "queue init")
106
107 msgs, err := List(ctxbg)
108 tcheck(t, err, "listing messages in queue")
109 if len(msgs) != 0 {
110 t.Fatalf("got %d messages in queue, expected 0", len(msgs))
111 }
112
113 path := smtp.Path{Localpart: "mjl", IPDomain: dns.IPDomain{Domain: dns.Domain{ASCII: "mox.example"}}}
114 mf := prepareFile(t)
115 defer os.Remove(mf.Name())
116 defer mf.Close()
117
118 var qm Msg
119
120 qm = MakeMsg(path, path, false, false, int64(len(testmsg)), "<test@localhost>", nil, nil, time.Now())
121 err = Add(ctxbg, pkglog, "mjl", mf, qm)
122 tcheck(t, err, "add message to queue for delivery")
123
124 qm = MakeMsg(path, path, false, false, int64(len(testmsg)), "<test@localhost>", nil, nil, time.Now())
125 err = Add(ctxbg, pkglog, "mjl", mf, qm)
126 tcheck(t, err, "add message to queue for delivery")
127
128 msgs, err = List(ctxbg)
129 tcheck(t, err, "listing queue")
130 if len(msgs) != 2 {
131 t.Fatalf("got msgs %v, expected 1", msgs)
132 }
133 msg := msgs[0]
134 if msg.Attempts != 0 {
135 t.Fatalf("msg attempts %d, expected 0", msg.Attempts)
136 }
137 n, err := Drop(ctxbg, pkglog, msgs[1].ID, "", "")
138 tcheck(t, err, "drop")
139 if n != 1 {
140 t.Fatalf("dropped %d, expected 1", n)
141 }
142 if _, err := os.Stat(msgs[1].MessagePath()); err == nil || !os.IsNotExist(err) {
143 t.Fatalf("dropped message not removed from file system")
144 }
145
146 next := nextWork(ctxbg, pkglog, nil)
147 if next > 0 {
148 t.Fatalf("nextWork in %s, should be now", next)
149 }
150 busy := map[string]struct{}{"mox.example": {}}
151 if x := nextWork(ctxbg, pkglog, busy); x != 24*time.Hour {
152 t.Fatalf("nextWork in %s for busy domain, should be in 24 hours", x)
153 }
154 if nn := launchWork(pkglog, nil, busy); nn != 0 {
155 t.Fatalf("launchWork launched %d deliveries, expected 0", nn)
156 }
157
158 mailDomain := dns.Domain{ASCII: "mox.example"}
159 mailHost := dns.Domain{ASCII: "mail.mox.example"}
160 resolver := dns.MockResolver{
161 A: map[string][]string{
162 "mail.mox.example.": {"127.0.0.1"},
163 "submission.example.": {"127.0.0.1"},
164 },
165 MX: map[string][]*net.MX{
166 "mox.example.": {{Host: "mail.mox.example", Pref: 10}},
167 "other.example.": {{Host: "mail.mox.example", Pref: 10}},
168 },
169 }
170 // Override dial function. We'll make connecting fail for now.
171 dialed := make(chan struct{}, 1)
172 smtpclient.DialHook = func(ctx context.Context, dialer smtpclient.Dialer, timeout time.Duration, addr string, laddr net.Addr) (net.Conn, error) {
173 dialed <- struct{}{}
174 return nil, fmt.Errorf("failure from test")
175 }
176 defer func() {
177 smtpclient.DialHook = nil
178 }()
179
180 launchWork(pkglog, resolver, map[string]struct{}{})
181
182 moxCert := fakeCert(t, "mail.mox.example", false)
183
184 // Wait until we see the dial and the failed attempt.
185 timer := time.NewTimer(time.Second)
186 defer timer.Stop()
187 select {
188 case <-dialed:
189 i := 0
190 for {
191 m, err := bstore.QueryDB[Msg](ctxbg, DB).Get()
192 tcheck(t, err, "get")
193 if m.Attempts == 1 {
194 break
195 }
196 i++
197 if i == 10 {
198 t.Fatalf("message in queue not updated")
199 }
200 time.Sleep(100 * time.Millisecond)
201 }
202 case <-timer.C:
203 t.Fatalf("no dial within 1s")
204 }
205 <-deliveryResults // Deliver sends here.
206
207 _, err = OpenMessage(ctxbg, msg.ID+1)
208 if err != bstore.ErrAbsent {
209 t.Fatalf("OpenMessage, got %v, expected ErrAbsent", err)
210 }
211 reader, err := OpenMessage(ctxbg, msg.ID)
212 tcheck(t, err, "open message")
213 defer reader.Close()
214 msgbuf, err := io.ReadAll(reader)
215 tcheck(t, err, "read message")
216 if string(msgbuf) != testmsg {
217 t.Fatalf("message mismatch, got %q, expected %q", string(msgbuf), testmsg)
218 }
219
220 n, err = Kick(ctxbg, msg.ID+1, "", "", nil)
221 tcheck(t, err, "kick")
222 if n != 0 {
223 t.Fatalf("kick %d, expected 0", n)
224 }
225 n, err = Kick(ctxbg, msg.ID, "", "", nil)
226 tcheck(t, err, "kick")
227 if n != 1 {
228 t.Fatalf("kicked %d, expected 1", n)
229 }
230
231 smtpdone := make(chan struct{})
232
233 nfakeSMTPServer := func(server net.Conn, rcpts, ntx int, onercpt bool, extensions []string) {
234 defer func() {
235 smtpdone <- struct{}{}
236 }()
237
238 // We do a minimal fake smtp server. We cannot import smtpserver.Serve due to
239 // cyclic dependencies.
240 fmt.Fprintf(server, "220 mail.mox.example\r\n")
241 br := bufio.NewReader(server)
242
243 readline := func(cmd string) {
244 line, err := br.ReadString('\n')
245 if err == nil && !strings.HasPrefix(strings.ToLower(line), cmd) {
246 panic(fmt.Sprintf("unexpected line %q, expected %q", line, cmd))
247 }
248 }
249 writeline := func(s string) {
250 fmt.Fprintf(server, "%s\r\n", s)
251 }
252
253 readline("ehlo")
254 writeline("250-mail.mox.example")
255 for _, ext := range extensions {
256 writeline("250-" + ext)
257 }
258 writeline("250 pipelining")
259 for tx := 0; tx < ntx; tx++ {
260 readline("mail")
261 writeline("250 ok")
262 for i := 0; i < rcpts; i++ {
263 readline("rcpt")
264 if onercpt && i > 0 {
265 writeline("552 ok")
266 } else {
267 writeline("250 ok")
268 }
269 }
270 readline("data")
271 writeline("354 continue")
272 reader := smtp.NewDataReader(br)
273 io.Copy(io.Discard, reader)
274 writeline("250 ok")
275 }
276 readline("quit")
277 writeline("221 ok")
278 }
279 fakeSMTPServer := func(server net.Conn) {
280 nfakeSMTPServer(server, 1, 1, false, nil)
281 }
282 fakeSMTPServer2Rcpts := func(server net.Conn) {
283 nfakeSMTPServer(server, 2, 1, false, nil)
284 }
285 fakeSMTPServerLimitRcpt1 := func(server net.Conn) {
286 nfakeSMTPServer(server, 1, 2, false, []string{"LIMITS RCPTMAX=1"})
287 }
288 // Server that returns an error after first recipient. We expect another
289 // transaction to deliver the second message.
290 fakeSMTPServerRcpt1 := func(server net.Conn) {
291 defer func() {
292 smtpdone <- struct{}{}
293 }()
294
295 // We do a minimal fake smtp server. We cannot import smtpserver.Serve due to
296 // cyclic dependencies.
297 fmt.Fprintf(server, "220 mail.mox.example\r\n")
298 br := bufio.NewReader(server)
299
300 readline := func(cmd string) {
301 line, err := br.ReadString('\n')
302 if err == nil && !strings.HasPrefix(strings.ToLower(line), cmd) {
303 panic(fmt.Sprintf("unexpected line %q, expected %q", line, cmd))
304 }
305 }
306 writeline := func(s string) {
307 fmt.Fprintf(server, "%s\r\n", s)
308 }
309
310 readline("ehlo")
311 writeline("250-mail.mox.example")
312 writeline("250 pipelining")
313
314 readline("mail")
315 writeline("250 ok")
316 readline("rcpt")
317 writeline("250 ok")
318 readline("rcpt")
319 writeline("552 ok")
320 readline("data")
321 writeline("354 continue")
322 reader := smtp.NewDataReader(br)
323 io.Copy(io.Discard, reader)
324 writeline("250 ok")
325
326 readline("mail")
327 writeline("250 ok")
328 readline("rcpt")
329 writeline("250 ok")
330 readline("data")
331 writeline("354 continue")
332 reader = smtp.NewDataReader(br)
333 io.Copy(io.Discard, reader)
334 writeline("250 ok")
335
336 readline("quit")
337 writeline("221 ok")
338 }
339
340 goodTLSConfig := tls.Config{Certificates: []tls.Certificate{moxCert}}
341 makeFakeSMTPSTARTTLSServer := func(tlsConfig *tls.Config, nstarttls int, requiretls bool) func(server net.Conn) {
342 attempt := 0
343 return func(server net.Conn) {
344 defer func() {
345 smtpdone <- struct{}{}
346 }()
347
348 attempt++
349
350 // We do a minimal fake smtp server. We cannot import smtpserver.Serve due to
351 // cyclic dependencies.
352 fmt.Fprintf(server, "220 mail.mox.example\r\n")
353 br := bufio.NewReader(server)
354
355 readline := func(cmd string) {
356 line, err := br.ReadString('\n')
357 if err == nil && !strings.HasPrefix(strings.ToLower(line), cmd) {
358 panic(fmt.Sprintf("unexpected line %q, expected %q", line, cmd))
359 }
360 }
361 writeline := func(s string) {
362 fmt.Fprintf(server, "%s\r\n", s)
363 }
364
365 readline("ehlo")
366 writeline("250-mail.mox.example")
367 writeline("250 starttls")
368 if nstarttls == 0 || attempt <= nstarttls {
369 readline("starttls")
370 writeline("220 ok")
371 tlsConn := tls.Server(server, tlsConfig)
372 err := tlsConn.Handshake()
373 if err != nil {
374 return
375 }
376 server = tlsConn
377 br = bufio.NewReader(server)
378
379 readline("ehlo")
380 if requiretls {
381 writeline("250-mail.mox.example")
382 writeline("250 requiretls")
383 } else {
384 writeline("250 mail.mox.example")
385 }
386 }
387 readline("mail")
388 writeline("250 ok")
389 readline("rcpt")
390 writeline("250 ok")
391 readline("data")
392 writeline("354 continue")
393 reader := smtp.NewDataReader(br)
394 io.Copy(io.Discard, reader)
395 writeline("250 ok")
396 readline("quit")
397 writeline("221 ok")
398 }
399 }
400
401 fakeSMTPSTARTTLSServer := makeFakeSMTPSTARTTLSServer(&goodTLSConfig, 0, true)
402 makeBadFakeSMTPSTARTTLSServer := func(requiretls bool) func(server net.Conn) {
403 return makeFakeSMTPSTARTTLSServer(&tls.Config{MaxVersion: tls.VersionTLS10, Certificates: []tls.Certificate{moxCert}}, 1, requiretls)
404 }
405
406 nfakeSubmitServer := func(server net.Conn, nrcpt int) {
407 defer func() {
408 smtpdone <- struct{}{}
409 }()
410
411 // We do a minimal fake smtp server. We cannot import smtpserver.Serve due to
412 // cyclic dependencies.
413 fmt.Fprintf(server, "220 mail.mox.example\r\n")
414 br := bufio.NewReader(server)
415 br.ReadString('\n') // Should be EHLO.
416 fmt.Fprintf(server, "250-localhost\r\n")
417 fmt.Fprintf(server, "250 AUTH PLAIN\r\n")
418 br.ReadString('\n') // Should be AUTH PLAIN
419 fmt.Fprintf(server, "235 2.7.0 auth ok\r\n")
420 br.ReadString('\n') // Should be MAIL FROM.
421 fmt.Fprintf(server, "250 ok\r\n")
422 for i := 0; i < nrcpt; i++ {
423 br.ReadString('\n') // Should be RCPT TO.
424 fmt.Fprintf(server, "250 ok\r\n")
425 }
426 br.ReadString('\n') // Should be DATA.
427 fmt.Fprintf(server, "354 continue\r\n")
428 reader := smtp.NewDataReader(br)
429 io.Copy(io.Discard, reader)
430 fmt.Fprintf(server, "250 ok\r\n")
431 br.ReadString('\n') // Should be QUIT.
432 fmt.Fprintf(server, "221 ok\r\n")
433 }
434 fakeSubmitServer := func(server net.Conn) {
435 nfakeSubmitServer(server, 1)
436 }
437 fakeSubmitServer2Rcpts := func(server net.Conn) {
438 nfakeSubmitServer(server, 2)
439 }
440
441 testQueue := func(expectDSN bool, fakeServer func(conn net.Conn)) bool {
442 t.Helper()
443
444 var pipes []net.Conn
445 defer func() {
446 for _, conn := range pipes {
447 conn.Close()
448 }
449 }()
450
451 var wasNetDialer bool
452 smtpclient.DialHook = func(ctx context.Context, dialer smtpclient.Dialer, timeout time.Duration, addr string, laddr net.Addr) (net.Conn, error) {
453 // Setting up a pipe. We'll start a fake smtp server on the server-side. And return the
454 // client-side to the invocation dial, for the attempted delivery from the queue.
455 server, client := net.Pipe()
456 pipes = append(pipes, server, client)
457 go fakeServer(server)
458
459 _, wasNetDialer = dialer.(*net.Dialer)
460
461 // For reconnects, we are already waiting for delivery below.
462 select {
463 case dialed <- struct{}{}:
464 default:
465 }
466
467 return client, nil
468 }
469 defer func() {
470 smtpclient.DialHook = nil
471 }()
472
473 inbox, err := bstore.QueryDB[store.Mailbox](ctxbg, acc.DB).FilterNonzero(store.Mailbox{Name: "Inbox"}).Get()
474 tcheck(t, err, "get inbox")
475
476 inboxCount, err := bstore.QueryDB[store.Message](ctxbg, acc.DB).FilterNonzero(store.Message{MailboxID: inbox.ID}).Count()
477 tcheck(t, err, "querying messages in inbox")
478
479 waitDeliver := func() {
480 t.Helper()
481 timer.Reset(time.Second)
482 select {
483 case <-dialed:
484 select {
485 case <-smtpdone:
486 i := 0
487 for {
488 xmsgs, err := List(ctxbg)
489 tcheck(t, err, "list queue")
490 if len(xmsgs) == 0 {
491 ninbox, err := bstore.QueryDB[store.Message](ctxbg, acc.DB).FilterNonzero(store.Message{MailboxID: inbox.ID}).Count()
492 tcheck(t, err, "querying messages in inbox")
493 if expectDSN && ninbox != inboxCount+1 {
494 t.Fatalf("got %d messages in inbox, previously %d, expected 1 additional for dsn", ninbox, inboxCount)
495 } else if !expectDSN && ninbox != inboxCount {
496 t.Fatalf("got %d messages in inbox, previously %d, expected no additional messages", ninbox, inboxCount)
497 }
498
499 break
500 }
501 i++
502 if i == 10 {
503 t.Fatalf("%d messages in queue, expected 0", len(xmsgs))
504 }
505 time.Sleep(100 * time.Millisecond)
506 }
507 case <-timer.C:
508 t.Fatalf("no deliver within 1s")
509 }
510 case <-timer.C:
511 t.Fatalf("no dial within 1s")
512 }
513 <-deliveryResults // Deliver sends here.
514 }
515
516 launchWork(pkglog, resolver, map[string]struct{}{})
517 waitDeliver()
518 return wasNetDialer
519 }
520 testDeliver := func(fakeServer func(conn net.Conn)) bool {
521 t.Helper()
522 return testQueue(false, fakeServer)
523 }
524 testDSN := func(fakeServer func(conn net.Conn)) bool {
525 t.Helper()
526 return testQueue(true, fakeServer)
527 }
528
529 // Test direct delivery.
530 wasNetDialer := testDeliver(fakeSMTPServer)
531 if !wasNetDialer {
532 t.Fatalf("expected net.Dialer as dialer")
533 }
534
535 // Single delivery to two recipients at same domain, expecting single connection
536 // and single transaction.
537 qm0 := MakeMsg(path, path, false, false, int64(len(testmsg)), "<test@localhost>", nil, nil, time.Now())
538 qml := []Msg{qm0, qm0} // Same NextAttempt.
539 err = Add(ctxbg, pkglog, "mjl", mf, qml...)
540 tcheck(t, err, "add messages to queue for delivery")
541 testDeliver(fakeSMTPServer2Rcpts)
542
543 // Single enqueue to two recipients at different domain, expecting two connections.
544 otheraddr, _ := smtp.ParseAddress("mjl@other.example")
545 otherpath := otheraddr.Path()
546 t0 := time.Now()
547 qml = []Msg{
548 MakeMsg(path, path, false, false, int64(len(testmsg)), "<test@localhost>", nil, nil, t0),
549 MakeMsg(path, otherpath, false, false, int64(len(testmsg)), "<test@localhost>", nil, nil, t0),
550 }
551 err = Add(ctxbg, pkglog, "mjl", mf, qml...)
552 tcheck(t, err, "add messages to queue for delivery")
553 conns := ConnectionCounter()
554 testDeliver(fakeSMTPServer)
555 nconns := ConnectionCounter()
556 if nconns != conns+2 {
557 t.Errorf("saw %d connections, expected 2", nconns-conns)
558 }
559
560 // Single enqueue with two recipients at same domain, but with smtp server that has
561 // LIMITS RCPTMAX=1, so we expect a single connection with two transactions.
562 qml = []Msg{qm0, qm0}
563 err = Add(ctxbg, pkglog, "mjl", mf, qml...)
564 tcheck(t, err, "add messages to queue for delivery")
565 testDeliver(fakeSMTPServerLimitRcpt1)
566
567 // Single enqueue with two recipients at same domain, but smtp server sends 552 for
568 // 2nd recipient, so we expect a single connection with two transactions.
569 qml = []Msg{qm0, qm0}
570 err = Add(ctxbg, pkglog, "mjl", mf, qml...)
571 tcheck(t, err, "add messages to queue for delivery")
572 testDeliver(fakeSMTPServerRcpt1)
573
574 // Add a message to be delivered with submit because of its route.
575 topath := smtp.Path{Localpart: "mjl", IPDomain: dns.IPDomain{Domain: dns.Domain{ASCII: "submit.example"}}}
576 qm = MakeMsg(path, topath, false, false, int64(len(testmsg)), "<test@localhost>", nil, nil, time.Now())
577 err = Add(ctxbg, pkglog, "mjl", mf, qm)
578 tcheck(t, err, "add message to queue for delivery")
579 wasNetDialer = testDeliver(fakeSubmitServer)
580 if !wasNetDialer {
581 t.Fatalf("expected net.Dialer as dialer")
582 }
583
584 // Two messages for submission.
585 qml = []Msg{qm, qm}
586 err = Add(ctxbg, pkglog, "mjl", mf, qml...)
587 tcheck(t, err, "add messages to queue for delivery")
588 wasNetDialer = testDeliver(fakeSubmitServer2Rcpts)
589 if !wasNetDialer {
590 t.Fatalf("expected net.Dialer as dialer")
591 }
592
593 // Add a message to be delivered with submit because of explicitly configured transport, that uses TLS.
594 qml = []Msg{MakeMsg(path, path, false, false, int64(len(testmsg)), "<test@localhost>", nil, nil, time.Now())}
595 err = Add(ctxbg, pkglog, "mjl", mf, qml...)
596 tcheck(t, err, "add message to queue for delivery")
597 transportSubmitTLS := "submittls"
598 n, err = Kick(ctxbg, qml[0].ID, "", "", &transportSubmitTLS)
599 tcheck(t, err, "kick queue")
600 if n != 1 {
601 t.Fatalf("kick changed %d messages, expected 1", n)
602 }
603 // Make fake cert, and make it trusted.
604 cert := fakeCert(t, "submission.example", false)
605 mox.Conf.Static.TLS.CertPool = x509.NewCertPool()
606 mox.Conf.Static.TLS.CertPool.AddCert(cert.Leaf)
607 tlsConfig := tls.Config{
608 Certificates: []tls.Certificate{cert},
609 }
610 wasNetDialer = testDeliver(func(conn net.Conn) {
611 conn = tls.Server(conn, &tlsConfig)
612 fakeSubmitServer(conn)
613 })
614 if !wasNetDialer {
615 t.Fatalf("expected net.Dialer as dialer")
616 }
617
618 // Various failure reasons.
619 fdNotTrusted := tlsrpt.FailureDetails{
620 ResultType: tlsrpt.ResultCertificateNotTrusted,
621 SendingMTAIP: "", // Missing due to pipe.
622 ReceivingMXHostname: "mail.mox.example",
623 ReceivingMXHelo: "mail.mox.example",
624 ReceivingIP: "", // Missing due to pipe.
625 FailedSessionCount: 1,
626 FailureReasonCode: "",
627 }
628 fdTLSAUnusable := tlsrpt.FailureDetails{
629 ResultType: tlsrpt.ResultTLSAInvalid,
630 ReceivingMXHostname: "mail.mox.example",
631 FailedSessionCount: 0,
632 FailureReasonCode: "all-unusable-records+ignored",
633 }
634 fdBadProtocol := tlsrpt.FailureDetails{
635 ResultType: tlsrpt.ResultValidationFailure,
636 ReceivingMXHostname: "mail.mox.example",
637 ReceivingMXHelo: "mail.mox.example",
638 FailedSessionCount: 1,
639 FailureReasonCode: "tls-remote-alert-70-protocol-version-not-supported",
640 }
641
642 // Add a message to be delivered with socks.
643 qml = []Msg{MakeMsg(path, path, false, false, int64(len(testmsg)), "<socks@localhost>", nil, nil, time.Now())}
644 err = Add(ctxbg, pkglog, "mjl", mf, qml...)
645 tcheck(t, err, "add message to queue for delivery")
646 transportSocks := "socks"
647 n, err = Kick(ctxbg, qml[0].ID, "", "", &transportSocks)
648 tcheck(t, err, "kick queue")
649 if n != 1 {
650 t.Fatalf("kick changed %d messages, expected 1", n)
651 }
652 wasNetDialer = testDeliver(fakeSMTPServer)
653 if wasNetDialer {
654 t.Fatalf("expected non-net.Dialer as dialer") // SOCKS5 dialer is a private type, we cannot check for it.
655 }
656
657 // Add message to be delivered with opportunistic TLS verification.
658 clearTLSResults(t)
659 qml = []Msg{MakeMsg(path, path, false, false, int64(len(testmsg)), "<opportunistictls@localhost>", nil, nil, time.Now())}
660 err = Add(ctxbg, pkglog, "mjl", mf, qml...)
661 tcheck(t, err, "add message to queue for delivery")
662 n, err = Kick(ctxbg, qml[0].ID, "", "", nil)
663 tcheck(t, err, "kick queue")
664 if n != 1 {
665 t.Fatalf("kick changed %d messages, expected 1", n)
666 }
667 testDeliver(fakeSMTPSTARTTLSServer)
668 checkTLSResults(t, "mox.example", "mox.example", false, addCounts(1, 0, tlsrpt.MakeResult(tlsrpt.NoPolicyFound, mailDomain, fdNotTrusted)))
669 checkTLSResults(t, "mail.mox.example", "mox.example", true, addCounts(1, 0, tlsrpt.MakeResult(tlsrpt.NoPolicyFound, mailHost)))
670
671 // Test fallback to plain text with TLS handshake fails.
672 clearTLSResults(t)
673 qml = []Msg{MakeMsg(path, path, false, false, int64(len(testmsg)), "<badtls@localhost>", nil, nil, time.Now())}
674 err = Add(ctxbg, pkglog, "mjl", mf, qml...)
675 tcheck(t, err, "add message to queue for delivery")
676 n, err = Kick(ctxbg, qml[0].ID, "", "", nil)
677 tcheck(t, err, "kick queue")
678 if n != 1 {
679 t.Fatalf("kick changed %d messages, expected 1", n)
680 }
681 testDeliver(makeBadFakeSMTPSTARTTLSServer(true))
682 checkTLSResults(t, "mox.example", "mox.example", false, addCounts(0, 1, tlsrpt.MakeResult(tlsrpt.NoPolicyFound, mailDomain, fdBadProtocol)))
683 checkTLSResults(t, "mail.mox.example", "mox.example", true, addCounts(0, 1, tlsrpt.MakeResult(tlsrpt.NoPolicyFound, mailHost, fdBadProtocol)))
684
685 // Add message to be delivered with DANE verification.
686 clearTLSResults(t)
687 resolver.AllAuthentic = true
688 resolver.TLSA = map[string][]adns.TLSA{
689 "_25._tcp.mail.mox.example.": {
690 {Usage: adns.TLSAUsageDANEEE, Selector: adns.TLSASelectorSPKI, MatchType: adns.TLSAMatchTypeFull, CertAssoc: moxCert.Leaf.RawSubjectPublicKeyInfo},
691 },
692 }
693 qml = []Msg{MakeMsg(path, path, false, false, int64(len(testmsg)), "<dane@localhost>", nil, nil, time.Now())}
694 err = Add(ctxbg, pkglog, "mjl", mf, qml...)
695 tcheck(t, err, "add message to queue for delivery")
696 n, err = Kick(ctxbg, qml[0].ID, "", "", nil)
697 tcheck(t, err, "kick queue")
698 if n != 1 {
699 t.Fatalf("kick changed %d messages, expected 1", n)
700 }
701 testDeliver(fakeSMTPSTARTTLSServer)
702 checkTLSResults(t, "mox.example", "mox.example", false, addCounts(1, 0, tlsrpt.MakeResult(tlsrpt.NoPolicyFound, mailDomain, fdNotTrusted)))
703 checkTLSResults(t, "mail.mox.example", "mox.example", true, addCounts(1, 0, tlsrpt.Result{Policy: tlsrpt.TLSAPolicy(resolver.TLSA["_25._tcp.mail.mox.example."], mailHost), FailureDetails: []tlsrpt.FailureDetails{}}))
704
705 // We should know starttls/requiretls by now.
706 rdt := store.RecipientDomainTLS{Domain: "mox.example"}
707 err = acc.DB.Get(ctxbg, &rdt)
708 tcheck(t, err, "get recipientdomaintls")
709 tcompare(t, rdt.STARTTLS, true)
710 tcompare(t, rdt.RequireTLS, true)
711
712 // Add message to be delivered with verified TLS and REQUIRETLS.
713 yes := true
714 qml = []Msg{MakeMsg(path, path, false, false, int64(len(testmsg)), "<opportunistictls@localhost>", nil, &yes, time.Now())}
715 err = Add(ctxbg, pkglog, "mjl", mf, qml...)
716 tcheck(t, err, "add message to queue for delivery")
717 n, err = Kick(ctxbg, qml[0].ID, "", "", nil)
718 tcheck(t, err, "kick queue")
719 if n != 1 {
720 t.Fatalf("kick changed %d messages, expected 1", n)
721 }
722 testDeliver(fakeSMTPSTARTTLSServer)
723
724 // Check that message is delivered with all unusable DANE records.
725 clearTLSResults(t)
726 resolver.TLSA = map[string][]adns.TLSA{
727 "_25._tcp.mail.mox.example.": {
728 {},
729 },
730 }
731 qml = []Msg{MakeMsg(path, path, false, false, int64(len(testmsg)), "<daneunusable@localhost>", nil, nil, time.Now())}
732 err = Add(ctxbg, pkglog, "mjl", mf, qml...)
733 tcheck(t, err, "add message to queue for delivery")
734 n, err = Kick(ctxbg, qml[0].ID, "", "", nil)
735 tcheck(t, err, "kick queue")
736 if n != 1 {
737 t.Fatalf("kick changed %d messages, expected 1", n)
738 }
739 testDeliver(fakeSMTPSTARTTLSServer)
740 checkTLSResults(t, "mox.example", "mox.example", false, addCounts(1, 0, tlsrpt.MakeResult(tlsrpt.NoPolicyFound, mailDomain, fdNotTrusted)))
741 checkTLSResults(t, "mail.mox.example", "mox.example", true, addCounts(1, 0, tlsrpt.Result{Policy: tlsrpt.TLSAPolicy([]adns.TLSA{}, mailHost), FailureDetails: []tlsrpt.FailureDetails{fdTLSAUnusable}}))
742
743 // Check that message is delivered with insecure TLSA records. They should be
744 // ignored and regular STARTTLS tried.
745 clearTLSResults(t)
746 resolver.Inauthentic = []string{"tlsa _25._tcp.mail.mox.example."}
747 resolver.TLSA = map[string][]adns.TLSA{
748 "_25._tcp.mail.mox.example.": {
749 {Usage: adns.TLSAUsageDANEEE, Selector: adns.TLSASelectorSPKI, MatchType: adns.TLSAMatchTypeFull, CertAssoc: make([]byte, sha256.Size)},
750 },
751 }
752 qml = []Msg{MakeMsg(path, path, false, false, int64(len(testmsg)), "<daneinsecure@localhost>", nil, nil, time.Now())}
753 err = Add(ctxbg, pkglog, "mjl", mf, qml...)
754 tcheck(t, err, "add message to queue for delivery")
755 n, err = Kick(ctxbg, qml[0].ID, "", "", nil)
756 tcheck(t, err, "kick queue")
757 if n != 1 {
758 t.Fatalf("kick changed %d messages, expected 1", n)
759 }
760 testDeliver(makeBadFakeSMTPSTARTTLSServer(true))
761 resolver.Inauthentic = nil
762 checkTLSResults(t, "mox.example", "mox.example", false, addCounts(0, 1, tlsrpt.MakeResult(tlsrpt.NoPolicyFound, mailDomain, fdBadProtocol)))
763 checkTLSResults(t, "mail.mox.example", "mox.example", true, addCounts(0, 1, tlsrpt.MakeResult(tlsrpt.NoPolicyFound, mailHost, fdBadProtocol)))
764
765 // STARTTLS failed, so not known supported.
766 rdt = store.RecipientDomainTLS{Domain: "mox.example"}
767 err = acc.DB.Get(ctxbg, &rdt)
768 tcheck(t, err, "get recipientdomaintls")
769 tcompare(t, rdt.STARTTLS, false)
770 tcompare(t, rdt.RequireTLS, false)
771
772 // Check that message is delivered with TLS-Required: No and non-matching DANE record.
773 no := false
774 qml = []Msg{MakeMsg(path, path, false, false, int64(len(testmsg)), "<tlsrequirednostarttls@localhost>", nil, &no, time.Now())}
775 err = Add(ctxbg, pkglog, "mjl", mf, qml...)
776 tcheck(t, err, "add message to queue for delivery")
777 n, err = Kick(ctxbg, qml[0].ID, "", "", nil)
778 tcheck(t, err, "kick queue")
779 if n != 1 {
780 t.Fatalf("kick changed %d messages, expected 1", n)
781 }
782 testDeliver(fakeSMTPSTARTTLSServer)
783
784 // Check that message is delivered with TLS-Required: No and bad TLS, falling back to plain text.
785 qml = []Msg{MakeMsg(path, path, false, false, int64(len(testmsg)), "<tlsrequirednoplaintext@localhost>", nil, &no, time.Now())}
786 err = Add(ctxbg, pkglog, "mjl", mf, qml...)
787 tcheck(t, err, "add message to queue for delivery")
788 n, err = Kick(ctxbg, qml[0].ID, "", "", nil)
789 tcheck(t, err, "kick queue")
790 if n != 1 {
791 t.Fatalf("kick changed %d messages, expected 1", n)
792 }
793 testDeliver(makeBadFakeSMTPSTARTTLSServer(true))
794
795 // Add message with requiretls that fails immediately due to no REQUIRETLS support in all servers.
796 qml = []Msg{MakeMsg(path, path, false, false, int64(len(testmsg)), "<tlsrequiredunsupported@localhost>", nil, &yes, time.Now())}
797 err = Add(ctxbg, pkglog, "mjl", mf, qml...)
798 tcheck(t, err, "add message to queue for delivery")
799 n, err = Kick(ctxbg, qml[0].ID, "", "", nil)
800 tcheck(t, err, "kick queue")
801 if n != 1 {
802 t.Fatalf("kick changed %d messages, expected 1", n)
803 }
804 testDSN(makeBadFakeSMTPSTARTTLSServer(false))
805
806 // Restore pre-DANE behaviour.
807 resolver.AllAuthentic = false
808 resolver.TLSA = nil
809
810 // Add message with requiretls that fails immediately due to no verification policy for recipient domain.
811 qml = []Msg{MakeMsg(path, path, false, false, int64(len(testmsg)), "<tlsrequirednopolicy@localhost>", nil, &yes, time.Now())}
812 err = Add(ctxbg, pkglog, "mjl", mf, qml...)
813 tcheck(t, err, "add message to queue for delivery")
814 n, err = Kick(ctxbg, qml[0].ID, "", "", nil)
815 tcheck(t, err, "kick queue")
816 if n != 1 {
817 t.Fatalf("kick changed %d messages, expected 1", n)
818 }
819 // Based on DNS lookups, there won't be any dialing or SMTP connection.
820 dialed <- struct{}{}
821 testDSN(func(conn net.Conn) {
822 smtpdone <- struct{}{}
823 })
824
825 // Add another message that we'll fail to deliver entirely.
826 qm = MakeMsg(path, path, false, false, int64(len(testmsg)), "<test@localhost>", nil, nil, time.Now())
827 err = Add(ctxbg, pkglog, "mjl", mf, qm)
828 tcheck(t, err, "add message to queue for delivery")
829
830 msgs, err = List(ctxbg)
831 tcheck(t, err, "list queue")
832 if len(msgs) != 1 {
833 t.Fatalf("queue has %d messages, expected 1", len(msgs))
834 }
835 msg = msgs[0]
836
837 prepServer := func(fn func(c net.Conn)) (net.Conn, func()) {
838 server, client := net.Pipe()
839 go func() {
840 fn(server)
841 server.Close()
842 }()
843 return client, func() {
844 server.Close()
845 client.Close()
846 }
847 }
848
849 conn2, cleanup2 := prepServer(func(conn net.Conn) { fmt.Fprintf(conn, "220 mail.mox.example\r\n") })
850 conn3, cleanup3 := prepServer(func(conn net.Conn) { fmt.Fprintf(conn, "451 mail.mox.example\r\n") })
851 conn4, cleanup4 := prepServer(fakeSMTPSTARTTLSServer)
852 defer func() {
853 cleanup2()
854 cleanup3()
855 cleanup4()
856 }()
857
858 seq := 0
859 smtpclient.DialHook = func(ctx context.Context, dialer smtpclient.Dialer, timeout time.Duration, addr string, laddr net.Addr) (net.Conn, error) {
860 seq++
861 switch seq {
862 default:
863 return nil, fmt.Errorf("connect error from test")
864 case 2:
865 return conn2, nil
866 case 3:
867 return conn3, nil
868 case 4:
869 return conn4, nil
870 }
871 }
872 defer func() {
873 smtpclient.DialHook = nil
874 }()
875
876 comm := store.RegisterComm(acc)
877 defer comm.Unregister()
878
879 for i := 1; i < 8; i++ {
880 go func() { <-deliveryResults }() // Deliver sends here.
881 if i == 4 {
882 resolver.AllAuthentic = true
883 resolver.TLSA = map[string][]adns.TLSA{
884 "_25._tcp.mail.mox.example.": {
885 // Non-matching zero CertAssoc, should cause failure.
886 {Usage: adns.TLSAUsageDANEEE, Selector: adns.TLSASelectorSPKI, MatchType: adns.TLSAMatchTypeSHA256, CertAssoc: make([]byte, sha256.Size)},
887 },
888 }
889 } else {
890 resolver.AllAuthentic = false
891 resolver.TLSA = nil
892 }
893 deliver(pkglog, resolver, msg)
894 err = DB.Get(ctxbg, &msg)
895 tcheck(t, err, "get msg")
896 if msg.Attempts != i {
897 t.Fatalf("got attempt %d, expected %d", msg.Attempts, i)
898 }
899 if msg.Attempts == 5 {
900 timer.Reset(time.Second)
901 changes := make(chan struct{}, 1)
902 go func() {
903 comm.Get()
904 changes <- struct{}{}
905 }()
906 select {
907 case <-changes:
908 case <-timer.C:
909 t.Fatalf("no dsn in 1s")
910 }
911 }
912 }
913
914 // Trigger final failure.
915 go func() { <-deliveryResults }() // Deliver sends here.
916 deliver(pkglog, resolver, msg)
917 err = DB.Get(ctxbg, &msg)
918 if err != bstore.ErrAbsent {
919 t.Fatalf("attempt to fetch delivered and removed message from queue, got err %v, expected ErrAbsent", err)
920 }
921
922 timer.Reset(time.Second)
923 changes := make(chan struct{}, 1)
924 go func() {
925 comm.Get()
926 changes <- struct{}{}
927 }()
928 select {
929 case <-changes:
930 case <-timer.C:
931 t.Fatalf("no dsn in 1s")
932 }
933}
934
935func addCounts(success, failure int64, result tlsrpt.Result) tlsrpt.Result {
936 result.Summary.TotalSuccessfulSessionCount += success
937 result.Summary.TotalFailureSessionCount += failure
938 return result
939}
940
941func clearTLSResults(t *testing.T) {
942 _, err := bstore.QueryDB[tlsrptdb.TLSResult](ctxbg, tlsrptdb.ResultDB).Delete()
943 tcheck(t, err, "delete tls results")
944}
945
946func checkTLSResults(t *testing.T, policyDomain, expRecipientDomain string, expIsHost bool, expResults ...tlsrpt.Result) {
947 t.Helper()
948 q := bstore.QueryDB[tlsrptdb.TLSResult](ctxbg, tlsrptdb.ResultDB)
949 q.FilterNonzero(tlsrptdb.TLSResult{PolicyDomain: policyDomain})
950 result, err := q.Get()
951 tcheck(t, err, "get tls result")
952 tcompare(t, result.RecipientDomain, expRecipientDomain)
953 tcompare(t, result.IsHost, expIsHost)
954
955 // Before comparing, compensate for go1.20 vs go1.21 difference.
956 for i, r := range result.Results {
957 for j, fd := range r.FailureDetails {
958 if fd.FailureReasonCode == "tls-remote-alert-70" {
959 result.Results[i].FailureDetails[j].FailureReasonCode = "tls-remote-alert-70-protocol-version-not-supported"
960 }
961 }
962 }
963 tcompare(t, result.Results, expResults)
964}
965
966// test Start and that it attempts to deliver.
967func TestQueueStart(t *testing.T) {
968 // Override dial function. We'll make connecting fail and check the attempt.
969 resolver := dns.MockResolver{
970 A: map[string][]string{"mox.example.": {"127.0.0.1"}},
971 MX: map[string][]*net.MX{"mox.example.": {{Host: "mox.example", Pref: 10}}},
972 }
973 dialed := make(chan struct{}, 1)
974 smtpclient.DialHook = func(ctx context.Context, dialer smtpclient.Dialer, timeout time.Duration, addr string, laddr net.Addr) (net.Conn, error) {
975 dialed <- struct{}{}
976 return nil, fmt.Errorf("failure from test")
977 }
978 defer func() {
979 smtpclient.DialHook = nil
980 }()
981
982 _, cleanup := setup(t)
983 defer cleanup()
984 done := make(chan struct{}, 1)
985 defer func() {
986 mox.ShutdownCancel()
987 <-done
988 mox.Shutdown, mox.ShutdownCancel = context.WithCancel(ctxbg)
989 }()
990 err := Start(resolver, done)
991 tcheck(t, err, "queue start")
992
993 checkDialed := func(need bool) {
994 t.Helper()
995 d := time.Second / 10
996 if need {
997 d = time.Second
998 }
999 timer := time.NewTimer(d)
1000 defer timer.Stop()
1001 select {
1002 case <-dialed:
1003 if !need {
1004 t.Fatalf("unexpected dial attempt")
1005 }
1006 case <-timer.C:
1007 if need {
1008 t.Fatalf("expected to see a dial attempt")
1009 }
1010 }
1011 }
1012
1013 path := smtp.Path{Localpart: "mjl", IPDomain: dns.IPDomain{Domain: dns.Domain{ASCII: "mox.example"}}}
1014 mf := prepareFile(t)
1015 defer os.Remove(mf.Name())
1016 defer mf.Close()
1017 qm := MakeMsg(path, path, false, false, int64(len(testmsg)), "<test@localhost>", nil, nil, time.Now())
1018 err = Add(ctxbg, pkglog, "mjl", mf, qm)
1019 tcheck(t, err, "add message to queue for delivery")
1020 checkDialed(true)
1021
1022 // Don't change message nextattempt time, but kick queue. Message should not be delivered.
1023 queuekick()
1024 checkDialed(false)
1025
1026 // Kick for real, should see another attempt.
1027 n, err := Kick(ctxbg, 0, "mox.example", "", nil)
1028 tcheck(t, err, "kick queue")
1029 if n != 1 {
1030 t.Fatalf("kick changed %d messages, expected 1", n)
1031 }
1032 checkDialed(true)
1033 time.Sleep(100 * time.Millisecond) // Racy... we won't get notified when work is done...
1034}
1035
1036// Just a cert that appears valid.
1037func fakeCert(t *testing.T, name string, expired bool) tls.Certificate {
1038 notAfter := time.Now()
1039 if expired {
1040 notAfter = notAfter.Add(-time.Hour)
1041 } else {
1042 notAfter = notAfter.Add(time.Hour)
1043 }
1044
1045 privKey := ed25519.NewKeyFromSeed(make([]byte, ed25519.SeedSize)) // Fake key, don't use this for real!
1046 template := &x509.Certificate{
1047 SerialNumber: big.NewInt(1), // Required field...
1048 DNSNames: []string{name},
1049 NotBefore: time.Now().Add(-time.Hour),
1050 NotAfter: notAfter,
1051 }
1052 localCertBuf, err := x509.CreateCertificate(cryptorand.Reader, template, template, privKey.Public(), privKey)
1053 if err != nil {
1054 t.Fatalf("making certificate: %s", err)
1055 }
1056 cert, err := x509.ParseCertificate(localCertBuf)
1057 if err != nil {
1058 t.Fatalf("parsing generated certificate: %s", err)
1059 }
1060 c := tls.Certificate{
1061 Certificate: [][]byte{localCertBuf},
1062 PrivateKey: privKey,
1063 Leaf: cert,
1064 }
1065 return c
1066}
1067