1// Package queue is in charge of outgoing messages, queueing them when submitted,
2// attempting a first delivery over SMTP, retrying with backoff and sending DSNs
3// for delayed or failed deliveries.
4package queue
5
6import (
7 "context"
8 "fmt"
9 "io"
10 "net"
11 "os"
12 "path/filepath"
13 "runtime/debug"
14 "sort"
15 "strings"
16 "time"
17
18 "golang.org/x/net/proxy"
19
20 "github.com/prometheus/client_golang/prometheus"
21 "github.com/prometheus/client_golang/prometheus/promauto"
22
23 "github.com/mjl-/bstore"
24
25 "github.com/mjl-/mox/config"
26 "github.com/mjl-/mox/dns"
27 "github.com/mjl-/mox/dsn"
28 "github.com/mjl-/mox/metrics"
29 "github.com/mjl-/mox/mlog"
30 "github.com/mjl-/mox/mox-"
31 "github.com/mjl-/mox/moxio"
32 "github.com/mjl-/mox/smtp"
33 "github.com/mjl-/mox/smtpclient"
34 "github.com/mjl-/mox/store"
35 "github.com/mjl-/mox/tlsrpt"
36 "github.com/mjl-/mox/tlsrptdb"
37)
38
39var xlog = mlog.New("queue")
40
41var (
42 metricConnection = promauto.NewCounterVec(
43 prometheus.CounterOpts{
44 Name: "mox_queue_connection_total",
45 Help: "Queue client connections, outgoing.",
46 },
47 []string{
48 "result", // "ok", "timeout", "canceled", "error"
49 },
50 )
51 metricDelivery = promauto.NewHistogramVec(
52 prometheus.HistogramOpts{
53 Name: "mox_queue_delivery_duration_seconds",
54 Help: "SMTP client delivery attempt to single host.",
55 Buckets: []float64{0.01, 0.05, 0.100, 0.5, 1, 5, 10, 20, 30, 60, 120},
56 },
57 []string{
58 "attempt", // Number of attempts.
59 "transport", // empty for default direct delivery.
60 "tlsmode", // immediate, requiredstarttls, opportunistic, skip (from smtpclient.TLSMode), with optional +mtasts and/or +dane.
61 "result", // ok, timeout, canceled, temperror, permerror, error
62 },
63 )
64)
65
66var jitter = mox.NewPseudoRand()
67
68var DBTypes = []any{Msg{}} // Types stored in DB.
69var DB *bstore.DB // Exported for making backups.
70
71// Set for mox localserve, to prevent queueing.
72var Localserve bool
73
74// Msg is a message in the queue.
75//
76// Use MakeMsg to make a message with fields that Add needs. Add will further set
77// queueing related fields.
78type Msg struct {
79 ID int64
80 Queued time.Time `bstore:"default now"`
81 SenderAccount string // Failures are delivered back to this local account. Also used for routing.
82 SenderLocalpart smtp.Localpart // Should be a local user and domain.
83 SenderDomain dns.IPDomain
84 RecipientLocalpart smtp.Localpart // Typically a remote user and domain.
85 RecipientDomain dns.IPDomain
86 RecipientDomainStr string // For filtering.
87 Attempts int // Next attempt is based on last attempt and exponential back off based on attempts.
88 MaxAttempts int // Max number of attempts before giving up. If 0, then the default of 8 attempts is used instead.
89 DialedIPs map[string][]net.IP // For each host, the IPs that were dialed. Used for IP selection for later attempts.
90 NextAttempt time.Time // For scheduling.
91 LastAttempt *time.Time
92 LastError string
93
94 Has8bit bool // Whether message contains bytes with high bit set, determines whether 8BITMIME SMTP extension is needed.
95 SMTPUTF8 bool // Whether message requires use of SMTPUTF8.
96 IsDMARCReport bool // Delivery failures for DMARC reports are handled differently.
97 IsTLSReport bool // Delivery failures for TLS reports are handled differently.
98 Size int64 // Full size of message, combined MsgPrefix with contents of message file.
99 MessageID string // Used when composing a DSN, in its References header.
100 MsgPrefix []byte
101
102 // If set, this message is a DSN and this is a version using utf-8, for the case
103 // the remote MTA supports smtputf8. In this case, Size and MsgPrefix are not
104 // relevant.
105 DSNUTF8 []byte
106
107 // If non-empty, the transport to use for this message. Can be set through cli or
108 // admin interface. If empty (the default for a submitted message), regular routing
109 // rules apply.
110 Transport string
111
112 // RequireTLS influences TLS verification during delivery.
113 //
114 // If nil, the recipient domain policy is followed (MTA-STS and/or DANE), falling
115 // back to optional opportunistic non-verified STARTTLS.
116 //
117 // If RequireTLS is true (through SMTP REQUIRETLS extension or webmail submit),
118 // MTA-STS or DANE is required, as well as REQUIRETLS support by the next hop
119 // server.
120 //
121 // If RequireTLS is false (through messag header "TLS-Required: No"), the recipient
122 // domain's policy is ignored if it does not lead to a successful TLS connection,
123 // i.e. falling back to SMTP delivery with unverified STARTTLS or plain text.
124 RequireTLS *bool
125 // ../rfc/8689:250
126}
127
128// Sender of message as used in MAIL FROM.
129func (m Msg) Sender() smtp.Path {
130 return smtp.Path{Localpart: m.SenderLocalpart, IPDomain: m.SenderDomain}
131}
132
133// Recipient of message as used in RCPT TO.
134func (m Msg) Recipient() smtp.Path {
135 return smtp.Path{Localpart: m.RecipientLocalpart, IPDomain: m.RecipientDomain}
136}
137
138// MessagePath returns the path where the message is stored.
139func (m Msg) MessagePath() string {
140 return mox.DataDirPath(filepath.Join("queue", store.MessagePath(m.ID)))
141}
142
143// Init opens the queue database without starting delivery.
144func Init() error {
145 qpath := mox.DataDirPath(filepath.FromSlash("queue/index.db"))
146 os.MkdirAll(filepath.Dir(qpath), 0770)
147 isNew := false
148 if _, err := os.Stat(qpath); err != nil && os.IsNotExist(err) {
149 isNew = true
150 }
151
152 var err error
153 DB, err = bstore.Open(mox.Shutdown, qpath, &bstore.Options{Timeout: 5 * time.Second, Perm: 0660}, DBTypes...)
154 if err != nil {
155 if isNew {
156 os.Remove(qpath)
157 }
158 return fmt.Errorf("open queue database: %s", err)
159 }
160 return nil
161}
162
163// Shutdown closes the queue database. The delivery process isn't stopped. For tests only.
164func Shutdown() {
165 err := DB.Close()
166 xlog.Check(err, "closing queue db")
167 DB = nil
168}
169
170// List returns all messages in the delivery queue.
171// Ordered by earliest delivery attempt first.
172func List(ctx context.Context) ([]Msg, error) {
173 qmsgs, err := bstore.QueryDB[Msg](ctx, DB).List()
174 if err != nil {
175 return nil, err
176 }
177 sort.Slice(qmsgs, func(i, j int) bool {
178 a := qmsgs[i]
179 b := qmsgs[j]
180 la := a.LastAttempt != nil
181 lb := b.LastAttempt != nil
182 if !la && lb {
183 return true
184 } else if la && !lb {
185 return false
186 }
187 if !la && !lb || a.LastAttempt.Equal(*b.LastAttempt) {
188 return a.ID < b.ID
189 }
190 return a.LastAttempt.Before(*b.LastAttempt)
191 })
192 return qmsgs, nil
193}
194
195// Count returns the number of messages in the delivery queue.
196func Count(ctx context.Context) (int, error) {
197 return bstore.QueryDB[Msg](ctx, DB).Count()
198}
199
200// MakeMsg is a convenience function that sets the commonly used fields for a Msg.
201func MakeMsg(senderAccount string, sender, recipient smtp.Path, has8bit, smtputf8 bool, size int64, messageID string, prefix []byte, requireTLS *bool) Msg {
202 return Msg{
203 SenderAccount: mox.Conf.Static.Postmaster.Account,
204 SenderLocalpart: sender.Localpart,
205 SenderDomain: sender.IPDomain,
206 RecipientLocalpart: recipient.Localpart,
207 RecipientDomain: recipient.IPDomain,
208 Has8bit: has8bit,
209 SMTPUTF8: smtputf8,
210 Size: size,
211 MessageID: messageID,
212 MsgPrefix: prefix,
213 RequireTLS: requireTLS,
214 }
215}
216
217// Add a new message to the queue. The queue is kicked immediately to start a
218// first delivery attempt.
219//
220// ID must be 0 and will be set after inserting in the queue.
221//
222// Add sets derived fields like RecipientDomainStr, and fields related to queueing,
223// such as Queued, NextAttempt, LastAttempt, LastError.
224func Add(ctx context.Context, log *mlog.Log, qm *Msg, msgFile *os.File) error {
225 // todo: Add should accept multiple rcptTo if they are for the same domain. so we can queue them for delivery in one (or just a few) session(s), transferring the data only once. ../rfc/5321:3759
226
227 if qm.ID != 0 {
228 return fmt.Errorf("id of queued message must be 0")
229 }
230 qm.Queued = time.Now()
231 qm.DialedIPs = nil
232 qm.NextAttempt = qm.Queued
233 qm.LastAttempt = nil
234 qm.LastError = ""
235 qm.RecipientDomainStr = formatIPDomain(qm.RecipientDomain)
236
237 if Localserve {
238 if qm.SenderAccount == "" {
239 return fmt.Errorf("cannot queue with localserve without local account")
240 }
241 acc, err := store.OpenAccount(qm.SenderAccount)
242 if err != nil {
243 return fmt.Errorf("opening sender account for immediate delivery with localserve: %v", err)
244 }
245 defer func() {
246 err := acc.Close()
247 log.Check(err, "closing account")
248 }()
249 m := store.Message{Size: qm.Size, MsgPrefix: qm.MsgPrefix}
250 conf, _ := acc.Conf()
251 dest := conf.Destinations[qm.Sender().String()]
252 acc.WithWLock(func() {
253 err = acc.DeliverDestination(log, dest, &m, msgFile)
254 })
255 if err != nil {
256 return fmt.Errorf("delivering message: %v", err)
257 }
258 log.Debug("immediately delivered from queue to sender")
259 return nil
260 }
261
262 tx, err := DB.Begin(ctx, true)
263 if err != nil {
264 return fmt.Errorf("begin transaction: %w", err)
265 }
266 defer func() {
267 if tx != nil {
268 if err := tx.Rollback(); err != nil {
269 log.Errorx("rollback for queue", err)
270 }
271 }
272 }()
273
274 if err := tx.Insert(qm); err != nil {
275 return err
276 }
277
278 dst := qm.MessagePath()
279 defer func() {
280 if dst != "" {
281 err := os.Remove(dst)
282 log.Check(err, "removing destination message file for queue", mlog.Field("path", dst))
283 }
284 }()
285 dstDir := filepath.Dir(dst)
286 os.MkdirAll(dstDir, 0770)
287 if err := moxio.LinkOrCopy(log, dst, msgFile.Name(), nil, true); err != nil {
288 return fmt.Errorf("linking/copying message to new file: %s", err)
289 } else if err := moxio.SyncDir(dstDir); err != nil {
290 return fmt.Errorf("sync directory: %v", err)
291 }
292
293 if err := tx.Commit(); err != nil {
294 return fmt.Errorf("commit transaction: %s", err)
295 }
296 tx = nil
297 dst = ""
298
299 queuekick()
300 return nil
301}
302
303func formatIPDomain(d dns.IPDomain) string {
304 if len(d.IP) > 0 {
305 return "[" + d.IP.String() + "]"
306 }
307 return d.Domain.Name()
308}
309
310var (
311 kick = make(chan struct{}, 1)
312 deliveryResult = make(chan string, 1)
313)
314
315func queuekick() {
316 select {
317 case kick <- struct{}{}:
318 default:
319 }
320}
321
322// Kick sets the NextAttempt for messages matching all filter parameters (ID,
323// toDomain, recipient) that are nonzero, and kicks the queue, attempting delivery
324// of those messages. If all parameters are zero, all messages are kicked. If
325// transport is set, the delivery attempts for the matching messages will use the
326// transport. An empty string is the default transport, i.e. direct delivery.
327// Returns number of messages queued for immediate delivery.
328func Kick(ctx context.Context, ID int64, toDomain, recipient string, transport *string) (int, error) {
329 q := bstore.QueryDB[Msg](ctx, DB)
330 if ID > 0 {
331 q.FilterID(ID)
332 }
333 if toDomain != "" {
334 q.FilterEqual("RecipientDomainStr", toDomain)
335 }
336 if recipient != "" {
337 q.FilterFn(func(qm Msg) bool {
338 return qm.Recipient().XString(true) == recipient
339 })
340 }
341 up := map[string]any{"NextAttempt": time.Now()}
342 if transport != nil {
343 if *transport != "" {
344 _, ok := mox.Conf.Static.Transports[*transport]
345 if !ok {
346 return 0, fmt.Errorf("unknown transport %q", *transport)
347 }
348 }
349 up["Transport"] = *transport
350 }
351 n, err := q.UpdateFields(up)
352 if err != nil {
353 return 0, fmt.Errorf("selecting and updating messages in queue: %v", err)
354 }
355 queuekick()
356 return n, nil
357}
358
359// Drop removes messages from the queue that match all nonzero parameters.
360// If all parameters are zero, all messages are removed.
361// Returns number of messages removed.
362func Drop(ctx context.Context, ID int64, toDomain string, recipient string) (int, error) {
363 q := bstore.QueryDB[Msg](ctx, DB)
364 if ID > 0 {
365 q.FilterID(ID)
366 }
367 if toDomain != "" {
368 q.FilterEqual("RecipientDomainStr", toDomain)
369 }
370 if recipient != "" {
371 q.FilterFn(func(qm Msg) bool {
372 return qm.Recipient().XString(true) == recipient
373 })
374 }
375 var msgs []Msg
376 q.Gather(&msgs)
377 n, err := q.Delete()
378 if err != nil {
379 return 0, fmt.Errorf("selecting and deleting messages from queue: %v", err)
380 }
381 for _, m := range msgs {
382 p := m.MessagePath()
383 if err := os.Remove(p); err != nil {
384 xlog.WithContext(ctx).Errorx("removing queue message from file system", err, mlog.Field("queuemsgid", m.ID), mlog.Field("path", p))
385 }
386 }
387 return n, nil
388}
389
390// SaveRequireTLS updates the RequireTLS field of the message with id.
391func SaveRequireTLS(ctx context.Context, id int64, requireTLS *bool) error {
392 return DB.Write(ctx, func(tx *bstore.Tx) error {
393 m := Msg{ID: id}
394 if err := tx.Get(&m); err != nil {
395 return fmt.Errorf("get message: %w", err)
396 }
397 m.RequireTLS = requireTLS
398 return tx.Update(&m)
399 })
400}
401
402type ReadReaderAtCloser interface {
403 io.ReadCloser
404 io.ReaderAt
405}
406
407// OpenMessage opens a message present in the queue.
408func OpenMessage(ctx context.Context, id int64) (ReadReaderAtCloser, error) {
409 qm := Msg{ID: id}
410 err := DB.Get(ctx, &qm)
411 if err != nil {
412 return nil, err
413 }
414 f, err := os.Open(qm.MessagePath())
415 if err != nil {
416 return nil, fmt.Errorf("open message file: %s", err)
417 }
418 r := store.FileMsgReader(qm.MsgPrefix, f)
419 return r, err
420}
421
422const maxConcurrentDeliveries = 10
423
424// Start opens the database by calling Init, then starts the delivery process.
425func Start(resolver dns.Resolver, done chan struct{}) error {
426 if err := Init(); err != nil {
427 return err
428 }
429
430 // High-level delivery strategy advice: ../rfc/5321:3685
431 go func() {
432 // Map keys are either dns.Domain.Name()'s, or string-formatted IP addresses.
433 busyDomains := map[string]struct{}{}
434
435 timer := time.NewTimer(0)
436
437 for {
438 select {
439 case <-mox.Shutdown.Done():
440 done <- struct{}{}
441 return
442 case <-kick:
443 case <-timer.C:
444 case domain := <-deliveryResult:
445 delete(busyDomains, domain)
446 }
447
448 if len(busyDomains) >= maxConcurrentDeliveries {
449 continue
450 }
451
452 launchWork(resolver, busyDomains)
453 timer.Reset(nextWork(mox.Shutdown, busyDomains))
454 }
455 }()
456 return nil
457}
458
459func nextWork(ctx context.Context, busyDomains map[string]struct{}) time.Duration {
460 q := bstore.QueryDB[Msg](ctx, DB)
461 if len(busyDomains) > 0 {
462 var doms []any
463 for d := range busyDomains {
464 doms = append(doms, d)
465 }
466 q.FilterNotEqual("RecipientDomainStr", doms...)
467 }
468 q.SortAsc("NextAttempt")
469 q.Limit(1)
470 qm, err := q.Get()
471 if err == bstore.ErrAbsent {
472 return 24 * time.Hour
473 } else if err != nil {
474 xlog.Errorx("finding time for next delivery attempt", err)
475 return 1 * time.Minute
476 }
477 return time.Until(qm.NextAttempt)
478}
479
480func launchWork(resolver dns.Resolver, busyDomains map[string]struct{}) int {
481 q := bstore.QueryDB[Msg](mox.Shutdown, DB)
482 q.FilterLessEqual("NextAttempt", time.Now())
483 q.SortAsc("NextAttempt")
484 q.Limit(maxConcurrentDeliveries)
485 if len(busyDomains) > 0 {
486 var doms []any
487 for d := range busyDomains {
488 doms = append(doms, d)
489 }
490 q.FilterNotEqual("RecipientDomainStr", doms...)
491 }
492 msgs, err := q.List()
493 if err != nil {
494 xlog.Errorx("querying for work in queue", err)
495 mox.Sleep(mox.Shutdown, 1*time.Second)
496 return -1
497 }
498
499 for _, m := range msgs {
500 busyDomains[formatIPDomain(m.RecipientDomain)] = struct{}{}
501 go deliver(resolver, m)
502 }
503 return len(msgs)
504}
505
506// Remove message from queue in database and file system.
507func queueDelete(ctx context.Context, msgID int64) error {
508 if err := DB.Delete(ctx, &Msg{ID: msgID}); err != nil {
509 return err
510 }
511 // If removing from database fails, we'll also leave the file in the file system.
512
513 p := mox.DataDirPath(filepath.Join("queue", store.MessagePath(msgID)))
514 if err := os.Remove(p); err != nil {
515 return fmt.Errorf("removing queue message from file system: %v", err)
516 }
517
518 return nil
519}
520
521// deliver attempts to deliver a message.
522// The queue is updated, either by removing a delivered or permanently failed
523// message, or updating the time for the next attempt. A DSN may be sent.
524func deliver(resolver dns.Resolver, m Msg) {
525 cid := mox.Cid()
526 qlog := xlog.WithCid(cid).Fields(mlog.Field("from", m.Sender()), mlog.Field("recipient", m.Recipient()), mlog.Field("attempts", m.Attempts), mlog.Field("msgid", m.ID))
527
528 defer func() {
529 deliveryResult <- formatIPDomain(m.RecipientDomain)
530
531 x := recover()
532 if x != nil {
533 qlog.Error("deliver panic", mlog.Field("panic", x))
534 debug.PrintStack()
535 metrics.PanicInc(metrics.Queue)
536 }
537 }()
538
539 // We register this attempt by setting last_attempt, and already next_attempt time
540 // in the future with exponential backoff. If we run into trouble delivery below,
541 // at least we won't be bothering the receiving server with our problems.
542 // Delivery attempts: immediately, 7.5m, 15m, 30m, 1h, 2h (send delayed DSN), 4h,
543 // 8h, 16h (send permanent failure DSN).
544 // ../rfc/5321:3703
545 // todo future: make the back off times configurable. ../rfc/5321:3713
546 backoff := time.Duration(7*60+30+jitter.Intn(10)-5) * time.Second
547 for i := 0; i < m.Attempts; i++ {
548 backoff *= time.Duration(2)
549 }
550 m.Attempts++
551 now := time.Now()
552 m.LastAttempt = &now
553 m.NextAttempt = now.Add(backoff)
554 qup := bstore.QueryDB[Msg](mox.Shutdown, DB)
555 qup.FilterID(m.ID)
556 update := Msg{Attempts: m.Attempts, NextAttempt: m.NextAttempt, LastAttempt: m.LastAttempt}
557 if _, err := qup.UpdateNonzero(update); err != nil {
558 qlog.Errorx("storing delivery attempt", err)
559 return
560 }
561
562 // Find route for transport to use for delivery attempt.
563 var transport config.Transport
564 var transportName string
565 if m.Transport != "" {
566 var ok bool
567 transport, ok = mox.Conf.Static.Transports[m.Transport]
568 if !ok {
569 var remoteMTA dsn.NameIP // Zero value, will not be included in DSN. ../rfc/3464:1027
570 fail(qlog, m, backoff, false, remoteMTA, "", fmt.Sprintf("cannot find transport %q", m.Transport))
571 return
572 }
573 transportName = m.Transport
574 } else {
575 route := findRoute(m.Attempts-1, m)
576 transport = route.ResolvedTransport
577 transportName = route.Transport
578 }
579
580 if transportName != "" {
581 qlog = qlog.Fields(mlog.Field("transport", transportName))
582 qlog.Debug("delivering with transport", mlog.Field("transport", transportName))
583 }
584
585 // We gather TLS connection successes and failures during delivery, and we store
586 // them in tlsrptb. Every 24 hours we send an email with a report to the recipient
587 // domains that opt in via a TLSRPT DNS record. For us, the tricky part is
588 // collecting all reporting information. We've got several TLS modes
589 // (opportunistic, DANE and/or MTA-STS (PKIX), overrides due to Require TLS).
590 // Failures can happen at various levels: MTA-STS policies (apply to whole delivery
591 // attempt/domain), MX targets (possibly multiple per delivery attempt, both for
592 // MTA-STS and DANE).
593 //
594 // Once the SMTP client has tried a TLS handshake, we register success/failure,
595 // regardless of what happens next on the connection. We also register failures
596 // when they happen before we get to the SMTP client, but only if they are related
597 // to TLS (and some DNSSEC).
598 var recipientDomainResult tlsrpt.Result
599 var hostResults []tlsrpt.Result
600 defer func() {
601 if mox.Conf.Static.NoOutgoingTLSReports || m.RecipientDomain.IsIP() {
602 return
603 }
604
605 now := time.Now()
606 dayUTC := now.UTC().Format("20060102")
607
608 // See if this contains a failure. If not, we'll mark TLS results for delivering
609 // DMARC reports SendReport false, so we won't as easily get into a report sending
610 // loop.
611 var failure bool
612 for _, result := range hostResults {
613 if result.Summary.TotalFailureSessionCount > 0 {
614 failure = true
615 break
616 }
617 }
618 if recipientDomainResult.Summary.TotalFailureSessionCount > 0 {
619 failure = true
620 }
621
622 results := make([]tlsrptdb.TLSResult, 0, 1+len(hostResults))
623 tlsaPolicyDomains := map[string]bool{}
624 addResult := func(r tlsrpt.Result, isHost bool) {
625 var zerotype tlsrpt.PolicyType
626 if r.Policy.Type == zerotype {
627 return
628 }
629
630 // Ensure we store policy domain in unicode in database.
631 policyDomain, err := dns.ParseDomain(r.Policy.Domain)
632 if err != nil {
633 qlog.Errorx("parsing policy domain for tls result", err, mlog.Field("policydomain", r.Policy.Domain))
634 return
635 }
636
637 if r.Policy.Type == tlsrpt.TLSA {
638 tlsaPolicyDomains[policyDomain.ASCII] = true
639 }
640
641 tlsResult := tlsrptdb.TLSResult{
642 PolicyDomain: policyDomain.Name(),
643 DayUTC: dayUTC,
644 RecipientDomain: m.RecipientDomain.Domain.Name(),
645 IsHost: isHost,
646 SendReport: !m.IsTLSReport && (!m.IsDMARCReport || failure),
647 Results: []tlsrpt.Result{r},
648 }
649 results = append(results, tlsResult)
650 }
651 for _, result := range hostResults {
652 addResult(result, true)
653 }
654 // If we were delivering to a mail host directly (not a domain with MX records), we
655 // are more likely to get a TLSA policy than an STS policy. Don't potentially
656 // confuse operators with both a tlsa and no-policy-found result.
657 // todo spec: ../rfc/8460:440 an explicit no-sts-policy result would be useful.
658 if recipientDomainResult.Policy.Type != tlsrpt.NoPolicyFound || !tlsaPolicyDomains[recipientDomainResult.Policy.Domain] {
659 addResult(recipientDomainResult, false)
660 }
661
662 if len(results) > 0 {
663 err := tlsrptdb.AddTLSResults(context.Background(), results)
664 qlog.Check(err, "adding tls results to database for upcoming tlsrpt report")
665 }
666 }()
667
668 var dialer smtpclient.Dialer = &net.Dialer{}
669 if transport.Submissions != nil {
670 deliverSubmit(cid, qlog, resolver, dialer, m, backoff, transportName, transport.Submissions, true, 465)
671 } else if transport.Submission != nil {
672 deliverSubmit(cid, qlog, resolver, dialer, m, backoff, transportName, transport.Submission, false, 587)
673 } else if transport.SMTP != nil {
674 // todo future: perhaps also gather tlsrpt results for submissions.
675 deliverSubmit(cid, qlog, resolver, dialer, m, backoff, transportName, transport.SMTP, false, 25)
676 } else {
677 ourHostname := mox.Conf.Static.HostnameDomain
678 if transport.Socks != nil {
679 socksdialer, err := proxy.SOCKS5("tcp", transport.Socks.Address, nil, &net.Dialer{})
680 if err != nil {
681 fail(qlog, m, backoff, false, dsn.NameIP{}, "", fmt.Sprintf("socks dialer: %v", err))
682 return
683 } else if d, ok := socksdialer.(smtpclient.Dialer); !ok {
684 fail(qlog, m, backoff, false, dsn.NameIP{}, "", "socks dialer is not a contextdialer")
685 return
686 } else {
687 dialer = d
688 }
689 ourHostname = transport.Socks.Hostname
690 }
691 recipientDomainResult, hostResults = deliverDirect(cid, qlog, resolver, dialer, ourHostname, transportName, m, backoff)
692 }
693}
694
695func findRoute(attempt int, m Msg) config.Route {
696 routesAccount, routesDomain, routesGlobal := mox.Conf.Routes(m.SenderAccount, m.SenderDomain.Domain)
697 if r, ok := findRouteInList(attempt, m, routesAccount); ok {
698 return r
699 }
700 if r, ok := findRouteInList(attempt, m, routesDomain); ok {
701 return r
702 }
703 if r, ok := findRouteInList(attempt, m, routesGlobal); ok {
704 return r
705 }
706 return config.Route{}
707}
708
709func findRouteInList(attempt int, m Msg, routes []config.Route) (config.Route, bool) {
710 for _, r := range routes {
711 if routeMatch(attempt, m, r) {
712 return r, true
713 }
714 }
715 return config.Route{}, false
716}
717
718func routeMatch(attempt int, m Msg, r config.Route) bool {
719 return attempt >= r.MinimumAttempts && routeMatchDomain(r.FromDomainASCII, m.SenderDomain.Domain) && routeMatchDomain(r.ToDomainASCII, m.RecipientDomain.Domain)
720}
721
722func routeMatchDomain(l []string, d dns.Domain) bool {
723 if len(l) == 0 {
724 return true
725 }
726 for _, e := range l {
727 if d.ASCII == e || strings.HasPrefix(e, ".") && (d.ASCII == e[1:] || strings.HasSuffix(d.ASCII, e)) {
728 return true
729 }
730 }
731 return false
732}
733