1// Package queue is in charge of outgoing messages, queueing them when submitted,
2// attempting a first delivery over SMTP, retrying with backoff and sending DSNs
3// for delayed or failed deliveries.
4package queue
5
6import (
7 "bytes"
8 "context"
9 "errors"
10 "fmt"
11 "io"
12 "log/slog"
13 "net"
14 "os"
15 "path/filepath"
16 "runtime/debug"
17 "slices"
18 "strings"
19 "time"
20
21 "golang.org/x/net/proxy"
22
23 "github.com/prometheus/client_golang/prometheus"
24 "github.com/prometheus/client_golang/prometheus/promauto"
25
26 "github.com/mjl-/bstore"
27
28 "github.com/mjl-/mox/config"
29 "github.com/mjl-/mox/dns"
30 "github.com/mjl-/mox/dsn"
31 "github.com/mjl-/mox/metrics"
32 "github.com/mjl-/mox/mlog"
33 "github.com/mjl-/mox/mox-"
34 "github.com/mjl-/mox/moxio"
35 "github.com/mjl-/mox/smtp"
36 "github.com/mjl-/mox/smtpclient"
37 "github.com/mjl-/mox/store"
38 "github.com/mjl-/mox/tlsrpt"
39 "github.com/mjl-/mox/tlsrptdb"
40 "github.com/mjl-/mox/webapi"
41 "github.com/mjl-/mox/webhook"
42)
43
44// ErrFromID indicate a fromid was present when adding a message to the queue, but
45// it wasn't unique.
46var ErrFromID = errors.New("fromid not unique")
47
48var (
49 metricConnection = promauto.NewCounterVec(
50 prometheus.CounterOpts{
51 Name: "mox_queue_connection_total",
52 Help: "Queue client connections, outgoing.",
53 },
54 []string{
55 "result", // "ok", "timeout", "canceled", "error"
56 },
57 )
58 metricDelivery = promauto.NewHistogramVec(
59 prometheus.HistogramOpts{
60 Name: "mox_queue_delivery_duration_seconds",
61 Help: "SMTP client delivery attempt to single host.",
62 Buckets: []float64{0.01, 0.05, 0.100, 0.5, 1, 5, 10, 20, 30, 60, 120},
63 },
64 []string{
65 "attempt", // Number of attempts.
66 "transport", // empty for default direct delivery.
67 "tlsmode", // immediate, requiredstarttls, opportunistic, skip (from smtpclient.TLSMode), with optional +mtasts and/or +dane.
68 "result", // ok, timeout, canceled, temperror, permerror, error
69 },
70 )
71 metricHold = promauto.NewGauge(
72 prometheus.GaugeOpts{
73 Name: "mox_queue_hold",
74 Help: "Messages in queue that are on hold.",
75 },
76 )
77)
78
79var jitter = mox.NewPseudoRand()
80
81var DBTypes = []any{Msg{}, HoldRule{}, MsgRetired{}, webapi.Suppression{}, Hook{}, HookRetired{}} // Types stored in DB.
82var DB *bstore.DB // Exported for making backups.
83
84// Allow requesting delivery starting from up to this interval from time of submission.
85const FutureReleaseIntervalMax = 60 * 24 * time.Hour
86
87// Set for mox localserve, to prevent queueing.
88var Localserve bool
89
90// HoldRule is a set of conditions that cause a matching message to be marked as on
91// hold when it is queued. All-empty conditions matches all messages, effectively
92// pausing the entire queue.
93type HoldRule struct {
94 ID int64
95 Account string
96 SenderDomain dns.Domain
97 RecipientDomain dns.Domain
98 SenderDomainStr string // Unicode.
99 RecipientDomainStr string // Unicode.
100}
101
102func (pr HoldRule) All() bool {
103 pr.ID = 0
104 return pr == HoldRule{}
105}
106
107func (pr HoldRule) matches(m Msg) bool {
108 return pr.All() || pr.Account == m.SenderAccount || pr.SenderDomainStr == m.SenderDomainStr || pr.RecipientDomainStr == m.RecipientDomainStr
109}
110
111// Msg is a message in the queue.
112//
113// Use MakeMsg to make a message with fields that Add needs. Add will further set
114// queueing related fields.
115type Msg struct {
116 ID int64
117
118 // A message for multiple recipients will get a BaseID that is identical to the
119 // first Msg.ID queued. The message contents will be identical for each recipient,
120 // including MsgPrefix. If other properties are identical too, including recipient
121 // domain, multiple Msgs may be delivered in a single SMTP transaction. For
122 // messages with a single recipient, this field will be 0.
123 BaseID int64 `bstore:"index"`
124
125 Queued time.Time `bstore:"default now"`
126 Hold bool // If set, delivery won't be attempted.
127 SenderAccount string // Failures are delivered back to this local account. Also used for routing.
128 SenderLocalpart smtp.Localpart // Should be a local user and domain.
129 SenderDomain dns.IPDomain
130 SenderDomainStr string // For filtering, unicode.
131 FromID string // For transactional messages, used to match later DSNs.
132 RecipientLocalpart smtp.Localpart // Typically a remote user and domain.
133 RecipientDomain dns.IPDomain
134 RecipientDomainStr string // For filtering, unicode domain. Can also contain ip enclosed in [].
135 Attempts int // Next attempt is based on last attempt and exponential back off based on attempts.
136 MaxAttempts int // Max number of attempts before giving up. If 0, then the default of 8 attempts is used instead.
137 DialedIPs map[string][]net.IP // For each host, the IPs that were dialed. Used for IP selection for later attempts.
138 NextAttempt time.Time // For scheduling.
139 LastAttempt *time.Time
140 Results []MsgResult
141
142 Has8bit bool // Whether message contains bytes with high bit set, determines whether 8BITMIME SMTP extension is needed.
143 SMTPUTF8 bool // Whether message requires use of SMTPUTF8.
144 IsDMARCReport bool // Delivery failures for DMARC reports are handled differently.
145 IsTLSReport bool // Delivery failures for TLS reports are handled differently.
146 Size int64 // Full size of message, combined MsgPrefix with contents of message file.
147 MessageID string // Message-ID header, including <>. Used when composing a DSN, in its References header.
148 MsgPrefix []byte // Data to send before the contents from the file, typically with headers like DKIM-Signature.
149 Subject string // For context about delivery.
150
151 // If set, this message is a DSN and this is a version using utf-8, for the case
152 // the remote MTA supports smtputf8. In this case, Size and MsgPrefix are not
153 // relevant.
154 DSNUTF8 []byte
155
156 // If non-empty, the transport to use for this message. Can be set through cli or
157 // admin interface. If empty (the default for a submitted message), regular routing
158 // rules apply.
159 Transport string
160
161 // RequireTLS influences TLS verification during delivery.
162 //
163 // If nil, the recipient domain policy is followed (MTA-STS and/or DANE), falling
164 // back to optional opportunistic non-verified STARTTLS.
165 //
166 // If RequireTLS is true (through SMTP REQUIRETLS extension or webmail submit),
167 // MTA-STS or DANE is required, as well as REQUIRETLS support by the next hop
168 // server.
169 //
170 // If RequireTLS is false (through messag header "TLS-Required: No"), the recipient
171 // domain's policy is ignored if it does not lead to a successful TLS connection,
172 // i.e. falling back to SMTP delivery with unverified STARTTLS or plain text.
173 RequireTLS *bool
174 // ../rfc/8689:250
175
176 // For DSNs, where the original FUTURERELEASE value must be included as per-message
177 // field. This field should be of the form "for;" plus interval, or "until;" plus
178 // utc date-time.
179 FutureReleaseRequest string
180 // ../rfc/4865:305
181
182 Extra map[string]string // Extra information, for transactional email.
183}
184
185// MsgResult is the result (or work in progress) of a delivery attempt.
186type MsgResult struct {
187 Start time.Time
188 Duration time.Duration
189 Success bool
190 Code int
191 Secode string
192 Error string
193 // todo: store smtp trace for failed deliveries for debugging, perhaps also for successful deliveries.
194}
195
196// Stored in MsgResult.Error while delivery is in progress. Replaced after success/error.
197const resultErrorDelivering = "delivering..."
198
199// markResult updates/adds a delivery result.
200func (m *Msg) markResult(code int, secode string, errmsg string, success bool) {
201 if len(m.Results) == 0 || m.Results[len(m.Results)-1].Error != resultErrorDelivering {
202 m.Results = append(m.Results, MsgResult{Start: time.Now()})
203 }
204 result := &m.Results[len(m.Results)-1]
205 result.Duration = time.Since(result.Start)
206 result.Code = code
207 result.Secode = secode
208 result.Error = errmsg
209 result.Success = false
210}
211
212// LastResult returns the last result entry, or an empty result.
213func (m *Msg) LastResult() MsgResult {
214 if len(m.Results) == 0 {
215 return MsgResult{Start: time.Now()}
216 }
217 return m.Results[len(m.Results)-1]
218}
219
220// Sender of message as used in MAIL FROM.
221func (m Msg) Sender() smtp.Path {
222 return smtp.Path{Localpart: m.SenderLocalpart, IPDomain: m.SenderDomain}
223}
224
225// Recipient of message as used in RCPT TO.
226func (m Msg) Recipient() smtp.Path {
227 return smtp.Path{Localpart: m.RecipientLocalpart, IPDomain: m.RecipientDomain}
228}
229
230// MessagePath returns the path where the message is stored.
231func (m Msg) MessagePath() string {
232 return mox.DataDirPath(filepath.Join("queue", store.MessagePath(m.ID)))
233}
234
235// todo: store which transport (if any) was actually used in MsgResult, based on routes.
236
237// Retired returns a MsgRetired for the message, for history of deliveries.
238func (m Msg) Retired(success bool, t, keepUntil time.Time) MsgRetired {
239 return MsgRetired{
240 ID: m.ID,
241 BaseID: m.BaseID,
242 Queued: m.Queued,
243 SenderAccount: m.SenderAccount,
244 SenderLocalpart: m.SenderLocalpart,
245 SenderDomainStr: m.SenderDomainStr,
246 FromID: m.FromID,
247 RecipientLocalpart: m.RecipientLocalpart,
248 RecipientDomain: m.RecipientDomain,
249 RecipientDomainStr: m.RecipientDomainStr,
250 Attempts: m.Attempts,
251 MaxAttempts: m.MaxAttempts,
252 DialedIPs: m.DialedIPs,
253 LastAttempt: m.LastAttempt,
254 Results: m.Results,
255 Has8bit: m.Has8bit,
256 SMTPUTF8: m.SMTPUTF8,
257 IsDMARCReport: m.IsDMARCReport,
258 IsTLSReport: m.IsTLSReport,
259 Size: m.Size,
260 MessageID: m.MessageID,
261 Subject: m.Subject,
262 Transport: m.Transport,
263 RequireTLS: m.RequireTLS,
264 FutureReleaseRequest: m.FutureReleaseRequest,
265 Extra: m.Extra,
266
267 RecipientAddress: smtp.Path{Localpart: m.RecipientLocalpart, IPDomain: m.RecipientDomain}.XString(true),
268 Success: success,
269 LastActivity: t,
270 KeepUntil: keepUntil,
271 }
272}
273
274// MsgRetired is a message for which delivery completed, either successful,
275// failed/canceled. Retired messages are only stored if so configured, and will be
276// cleaned up after the configured period.
277type MsgRetired struct {
278 ID int64 // Same ID as it was as Msg.ID.
279
280 BaseID int64
281 Queued time.Time
282 SenderAccount string // Failures are delivered back to this local account. Also used for routing.
283 SenderLocalpart smtp.Localpart // Should be a local user and domain.
284 SenderDomainStr string // For filtering, unicode.
285 FromID string `bstore:"index"` // Used to match DSNs.
286 RecipientLocalpart smtp.Localpart // Typically a remote user and domain.
287 RecipientDomain dns.IPDomain
288 RecipientDomainStr string // For filtering, unicode.
289 Attempts int // Next attempt is based on last attempt and exponential back off based on attempts.
290 MaxAttempts int // Max number of attempts before giving up. If 0, then the default of 8 attempts is used instead.
291 DialedIPs map[string][]net.IP // For each host, the IPs that were dialed. Used for IP selection for later attempts.
292 LastAttempt *time.Time
293 Results []MsgResult
294
295 Has8bit bool // Whether message contains bytes with high bit set, determines whether 8BITMIME SMTP extension is needed.
296 SMTPUTF8 bool // Whether message requires use of SMTPUTF8.
297 IsDMARCReport bool // Delivery failures for DMARC reports are handled differently.
298 IsTLSReport bool // Delivery failures for TLS reports are handled differently.
299 Size int64 // Full size of message, combined MsgPrefix with contents of message file.
300 MessageID string // Used when composing a DSN, in its References header.
301 Subject string // For context about delivery.
302
303 Transport string
304 RequireTLS *bool
305 FutureReleaseRequest string
306
307 Extra map[string]string // Extra information, for transactional email.
308
309 LastActivity time.Time `bstore:"index"`
310 RecipientAddress string `bstore:"index RecipientAddress+LastActivity"`
311 Success bool // Whether delivery to next hop succeeded.
312 KeepUntil time.Time `bstore:"index"`
313}
314
315// Sender of message as used in MAIL FROM.
316func (m MsgRetired) Sender() (path smtp.Path, err error) {
317 path.Localpart = m.RecipientLocalpart
318 if strings.HasPrefix(m.SenderDomainStr, "[") && strings.HasSuffix(m.SenderDomainStr, "]") {
319 s := m.SenderDomainStr[1 : len(m.SenderDomainStr)-1]
320 path.IPDomain.IP = net.ParseIP(s)
321 if path.IPDomain.IP == nil {
322 err = fmt.Errorf("parsing ip address %q: %v", s, err)
323 }
324 } else {
325 path.IPDomain.Domain, err = dns.ParseDomain(m.SenderDomainStr)
326 }
327 return
328}
329
330// Recipient of message as used in RCPT TO.
331func (m MsgRetired) Recipient() smtp.Path {
332 return smtp.Path{Localpart: m.RecipientLocalpart, IPDomain: m.RecipientDomain}
333}
334
335// LastResult returns the last result entry, or an empty result.
336func (m MsgRetired) LastResult() MsgResult {
337 if len(m.Results) == 0 {
338 return MsgResult{}
339 }
340 return m.Results[len(m.Results)-1]
341}
342
343// Init opens the queue database without starting delivery.
344func Init() error {
345 qpath := mox.DataDirPath(filepath.FromSlash("queue/index.db"))
346 os.MkdirAll(filepath.Dir(qpath), 0770)
347 isNew := false
348 if _, err := os.Stat(qpath); err != nil && os.IsNotExist(err) {
349 isNew = true
350 }
351
352 var err error
353 log := mlog.New("queue", nil)
354 opts := bstore.Options{Timeout: 5 * time.Second, Perm: 0660, RegisterLogger: log.Logger}
355 DB, err = bstore.Open(mox.Shutdown, qpath, &opts, DBTypes...)
356 if err == nil {
357 err = DB.Read(mox.Shutdown, func(tx *bstore.Tx) error {
358 return metricHoldUpdate(tx)
359 })
360 }
361 if err != nil {
362 if isNew {
363 os.Remove(qpath)
364 }
365 return fmt.Errorf("open queue database: %s", err)
366 }
367 return nil
368}
369
370// When we update the gauge, we just get the full current value, not try to account
371// for adds/removes.
372func metricHoldUpdate(tx *bstore.Tx) error {
373 count, err := bstore.QueryTx[Msg](tx).FilterNonzero(Msg{Hold: true}).Count()
374 if err != nil {
375 return fmt.Errorf("querying messages on hold for metric: %v", err)
376 }
377 metricHold.Set(float64(count))
378 return nil
379}
380
381// Shutdown closes the queue database. The delivery process isn't stopped. For tests only.
382func Shutdown() {
383 err := DB.Close()
384 if err != nil {
385 mlog.New("queue", nil).Errorx("closing queue db", err)
386 }
387 DB = nil
388}
389
390// todo: the filtering & sorting can use improvements. too much duplicated code (variants between {Msg,Hook}{,Retired}. Sort has pagination fields, some untyped.
391
392// Filter filters messages to list or operate on. Used by admin web interface
393// and cli.
394//
395// Only non-empty/non-zero values are applied to the filter. Leaving all fields
396// empty/zero matches all messages.
397type Filter struct {
398 Max int
399 IDs []int64
400 Account string
401 From string
402 To string
403 Hold *bool
404 Submitted string // Whether submitted before/after a time relative to now. ">$duration" or "<$duration", also with "now" for duration.
405 NextAttempt string // ">$duration" or "<$duration", also with "now" for duration.
406 Transport *string
407}
408
409func (f Filter) apply(q *bstore.Query[Msg]) error {
410 if len(f.IDs) > 0 {
411 q.FilterIDs(f.IDs)
412 }
413 applyTime := func(field string, s string) error {
414 orig := s
415 var before bool
416 if strings.HasPrefix(s, "<") {
417 before = true
418 } else if !strings.HasPrefix(s, ">") {
419 return fmt.Errorf(`must start with "<" for before or ">" for after a duration`)
420 }
421 s = strings.TrimSpace(s[1:])
422 var t time.Time
423 if s == "now" {
424 t = time.Now()
425 } else if d, err := time.ParseDuration(s); err != nil {
426 return fmt.Errorf("parsing duration %q: %v", orig, err)
427 } else {
428 t = time.Now().Add(d)
429 }
430 if before {
431 q.FilterLess(field, t)
432 } else {
433 q.FilterGreater(field, t)
434 }
435 return nil
436 }
437 if f.Hold != nil {
438 q.FilterEqual("Hold", *f.Hold)
439 }
440 if f.Submitted != "" {
441 if err := applyTime("Queued", f.Submitted); err != nil {
442 return fmt.Errorf("applying filter for submitted: %v", err)
443 }
444 }
445 if f.NextAttempt != "" {
446 if err := applyTime("NextAttempt", f.NextAttempt); err != nil {
447 return fmt.Errorf("applying filter for next attempt: %v", err)
448 }
449 }
450 if f.Account != "" {
451 q.FilterNonzero(Msg{SenderAccount: f.Account})
452 }
453 if f.Transport != nil {
454 q.FilterEqual("Transport", *f.Transport)
455 }
456 if f.From != "" || f.To != "" {
457 q.FilterFn(func(m Msg) bool {
458 return f.From != "" && strings.Contains(m.Sender().XString(true), f.From) || f.To != "" && strings.Contains(m.Recipient().XString(true), f.To)
459 })
460 }
461 if f.Max != 0 {
462 q.Limit(f.Max)
463 }
464 return nil
465}
466
467type Sort struct {
468 Field string // "Queued" or "NextAttempt"/"".
469 LastID int64 // If > 0, we return objects beyond this, less/greater depending on Asc.
470 Last any // Value of Field for last object. Must be set iff LastID is set.
471 Asc bool // Ascending, or descending.
472}
473
474func (s Sort) apply(q *bstore.Query[Msg]) error {
475 switch s.Field {
476 case "", "NextAttempt":
477 s.Field = "NextAttempt"
478 case "Queued":
479 s.Field = "Queued"
480 default:
481 return fmt.Errorf("unknown sort order field %q", s.Field)
482 }
483
484 if s.LastID > 0 {
485 ls, ok := s.Last.(string)
486 if !ok {
487 return fmt.Errorf("last should be string with time, not %T %q", s.Last, s.Last)
488 }
489 last, err := time.Parse(time.RFC3339Nano, ls)
490 if err != nil {
491 last, err = time.Parse(time.RFC3339, ls)
492 }
493 if err != nil {
494 return fmt.Errorf("parsing last %q as time: %v", s.Last, err)
495 }
496 q.FilterNotEqual("ID", s.LastID)
497 var fieldEqual func(m Msg) bool
498 if s.Field == "NextAttempt" {
499 fieldEqual = func(m Msg) bool { return m.NextAttempt.Equal(last) }
500 } else {
501 fieldEqual = func(m Msg) bool { return m.Queued.Equal(last) }
502 }
503 if s.Asc {
504 q.FilterGreaterEqual(s.Field, last)
505 q.FilterFn(func(m Msg) bool {
506 return !fieldEqual(m) || m.ID > s.LastID
507 })
508 } else {
509 q.FilterLessEqual(s.Field, last)
510 q.FilterFn(func(m Msg) bool {
511 return !fieldEqual(m) || m.ID < s.LastID
512 })
513 }
514 }
515 if s.Asc {
516 q.SortAsc(s.Field, "ID")
517 } else {
518 q.SortDesc(s.Field, "ID")
519 }
520 return nil
521}
522
523// List returns max 100 messages matching filter in the delivery queue.
524// By default, orders by next delivery attempt.
525func List(ctx context.Context, filter Filter, sort Sort) ([]Msg, error) {
526 q := bstore.QueryDB[Msg](ctx, DB)
527 if err := filter.apply(q); err != nil {
528 return nil, err
529 }
530 if err := sort.apply(q); err != nil {
531 return nil, err
532 }
533 qmsgs, err := q.List()
534 if err != nil {
535 return nil, err
536 }
537 return qmsgs, nil
538}
539
540// Count returns the number of messages in the delivery queue.
541func Count(ctx context.Context) (int, error) {
542 return bstore.QueryDB[Msg](ctx, DB).Count()
543}
544
545// HoldRuleList returns all hold rules.
546func HoldRuleList(ctx context.Context) ([]HoldRule, error) {
547 return bstore.QueryDB[HoldRule](ctx, DB).List()
548}
549
550// HoldRuleAdd adds a new hold rule causing newly submitted messages to be marked
551// as "on hold", and existing matching messages too.
552func HoldRuleAdd(ctx context.Context, log mlog.Log, hr HoldRule) (HoldRule, error) {
553 var n int
554 err := DB.Write(ctx, func(tx *bstore.Tx) error {
555 hr.ID = 0
556 hr.SenderDomainStr = hr.SenderDomain.Name()
557 hr.RecipientDomainStr = hr.RecipientDomain.Name()
558 if err := tx.Insert(&hr); err != nil {
559 return err
560 }
561 log.Info("adding hold rule", slog.Any("holdrule", hr))
562
563 q := bstore.QueryTx[Msg](tx)
564 if !hr.All() {
565 q.FilterNonzero(Msg{
566 SenderAccount: hr.Account,
567 SenderDomainStr: hr.SenderDomainStr,
568 RecipientDomainStr: hr.RecipientDomainStr,
569 })
570 }
571 var err error
572 n, err = q.UpdateField("Hold", true)
573 if err != nil {
574 return fmt.Errorf("marking existing matching messages in queue on hold: %v", err)
575 }
576 return metricHoldUpdate(tx)
577 })
578 if err != nil {
579 return HoldRule{}, err
580 }
581 log.Info("marked messages in queue as on hold", slog.Int("messages", n))
582 msgqueueKick()
583 return hr, nil
584}
585
586// HoldRuleRemove removes a hold rule. The Hold field of existing messages are not
587// changed.
588func HoldRuleRemove(ctx context.Context, log mlog.Log, holdRuleID int64) error {
589 return DB.Write(ctx, func(tx *bstore.Tx) error {
590 hr := HoldRule{ID: holdRuleID}
591 if err := tx.Get(&hr); err != nil {
592 return err
593 }
594 log.Info("removing hold rule", slog.Any("holdrule", hr))
595 return tx.Delete(HoldRule{ID: holdRuleID})
596 })
597}
598
599// MakeMsg is a convenience function that sets the commonly used fields for a Msg.
600// messageID should include <>.
601func MakeMsg(sender, recipient smtp.Path, has8bit, smtputf8 bool, size int64, messageID string, prefix []byte, requireTLS *bool, next time.Time, subject string) Msg {
602 return Msg{
603 SenderLocalpart: sender.Localpart,
604 SenderDomain: sender.IPDomain,
605 RecipientLocalpart: recipient.Localpart,
606 RecipientDomain: recipient.IPDomain,
607 Has8bit: has8bit,
608 SMTPUTF8: smtputf8,
609 Size: size,
610 MessageID: messageID,
611 MsgPrefix: prefix,
612 Subject: subject,
613 RequireTLS: requireTLS,
614 Queued: time.Now(),
615 NextAttempt: next,
616 }
617}
618
619// Add one or more new messages to the queue. If the sender paths and MsgPrefix are
620// identical, they'll get the same BaseID, so they can be delivered in a single
621// SMTP transaction, with a single DATA command, but may be split into multiple
622// transactions if errors/limits are encountered. The queue is kicked immediately
623// to start a first delivery attempt.
624//
625// ID of the messagse must be 0 and will be set after inserting in the queue.
626//
627// Add sets derived fields like SenderDomainStr and RecipientDomainStr, and fields
628// related to queueing, such as Queued, NextAttempt.
629func Add(ctx context.Context, log mlog.Log, senderAccount string, msgFile *os.File, qml ...Msg) error {
630 if len(qml) == 0 {
631 return fmt.Errorf("must queue at least one message")
632 }
633
634 base := true
635
636 for i, qm := range qml {
637 if qm.ID != 0 {
638 return fmt.Errorf("id of queued messages must be 0")
639 }
640 // Sanity check, internal consistency.
641 qml[i].SenderDomainStr = formatIPDomain(qm.SenderDomain)
642 qml[i].RecipientDomainStr = formatIPDomain(qm.RecipientDomain)
643 if base && i > 0 && qm.Sender().String() != qml[0].Sender().String() || !bytes.Equal(qm.MsgPrefix, qml[0].MsgPrefix) {
644 base = false
645 }
646 }
647
648 tx, err := DB.Begin(ctx, true)
649 if err != nil {
650 return fmt.Errorf("begin transaction: %w", err)
651 }
652 defer func() {
653 if tx != nil {
654 if err := tx.Rollback(); err != nil {
655 log.Errorx("rollback for queue", err)
656 }
657 }
658 }()
659
660 // Mark messages Hold if they match a hold rule.
661 holdRules, err := bstore.QueryTx[HoldRule](tx).List()
662 if err != nil {
663 return fmt.Errorf("getting queue hold rules")
664 }
665
666 // Insert messages into queue. If multiple messages are to be delivered in a single
667 // transaction, they all get a non-zero BaseID that is the Msg.ID of the first
668 // message inserted.
669 var baseID int64
670 for i := range qml {
671 // FromIDs must be unique if present. We don't have a unique index because values
672 // can be the empty string. We check in both Msg and MsgRetired, both are relevant
673 // for uniquely identifying a message sent in the past.
674 if fromID := qml[i].FromID; fromID != "" {
675 if exists, err := bstore.QueryTx[Msg](tx).FilterNonzero(Msg{FromID: fromID}).Exists(); err != nil {
676 return fmt.Errorf("looking up fromid: %v", err)
677 } else if exists {
678 return fmt.Errorf("%w: fromid %q already present in message queue", ErrFromID, fromID)
679 }
680 if exists, err := bstore.QueryTx[MsgRetired](tx).FilterNonzero(MsgRetired{FromID: fromID}).Exists(); err != nil {
681 return fmt.Errorf("looking up fromid: %v", err)
682 } else if exists {
683 return fmt.Errorf("%w: fromid %q already present in retired message queue", ErrFromID, fromID)
684 }
685 }
686
687 qml[i].SenderAccount = senderAccount
688 qml[i].BaseID = baseID
689 for _, hr := range holdRules {
690 if hr.matches(qml[i]) {
691 qml[i].Hold = true
692 break
693 }
694 }
695 if err := tx.Insert(&qml[i]); err != nil {
696 return err
697 }
698 if base && i == 0 && len(qml) > 1 {
699 baseID = qml[i].ID
700 qml[i].BaseID = baseID
701 if err := tx.Update(&qml[i]); err != nil {
702 return err
703 }
704 }
705 }
706
707 var paths []string
708 defer func() {
709 for _, p := range paths {
710 err := os.Remove(p)
711 log.Check(err, "removing destination message file for queue", slog.String("path", p))
712 }
713 }()
714
715 for _, qm := range qml {
716 dst := qm.MessagePath()
717 paths = append(paths, dst)
718 dstDir := filepath.Dir(dst)
719 os.MkdirAll(dstDir, 0770)
720 if err := moxio.LinkOrCopy(log, dst, msgFile.Name(), nil, true); err != nil {
721 return fmt.Errorf("linking/copying message to new file: %s", err)
722 } else if err := moxio.SyncDir(log, dstDir); err != nil {
723 return fmt.Errorf("sync directory: %v", err)
724 }
725 }
726
727 for _, m := range qml {
728 if m.Hold {
729 if err := metricHoldUpdate(tx); err != nil {
730 return err
731 }
732 break
733 }
734 }
735
736 if err := tx.Commit(); err != nil {
737 return fmt.Errorf("commit transaction: %s", err)
738 }
739 tx = nil
740 paths = nil
741
742 msgqueueKick()
743
744 return nil
745}
746
747func formatIPDomain(d dns.IPDomain) string {
748 if len(d.IP) > 0 {
749 return "[" + d.IP.String() + "]"
750 }
751 return d.Domain.Name()
752}
753
754var (
755 msgqueue = make(chan struct{}, 1)
756 deliveryResults = make(chan string, 1)
757)
758
759func kick() {
760 msgqueueKick()
761 hookqueueKick()
762}
763
764func msgqueueKick() {
765 select {
766 case msgqueue <- struct{}{}:
767 default:
768 }
769}
770
771// NextAttemptAdd adds a duration to the NextAttempt for all matching messages, and
772// kicks the queue.
773func NextAttemptAdd(ctx context.Context, filter Filter, d time.Duration) (affected int, err error) {
774 err = DB.Write(ctx, func(tx *bstore.Tx) error {
775 q := bstore.QueryTx[Msg](tx)
776 if err := filter.apply(q); err != nil {
777 return err
778 }
779 msgs, err := q.List()
780 if err != nil {
781 return fmt.Errorf("listing matching messages: %v", err)
782 }
783 for _, m := range msgs {
784 m.NextAttempt = m.NextAttempt.Add(d)
785 if err := tx.Update(&m); err != nil {
786 return err
787 }
788 }
789 affected = len(msgs)
790 return nil
791 })
792 if err != nil {
793 return 0, err
794 }
795 msgqueueKick()
796 return affected, nil
797}
798
799// NextAttemptSet sets NextAttempt for all matching messages to a new time, and
800// kicks the queue.
801func NextAttemptSet(ctx context.Context, filter Filter, t time.Time) (affected int, err error) {
802 q := bstore.QueryDB[Msg](ctx, DB)
803 if err := filter.apply(q); err != nil {
804 return 0, err
805 }
806 n, err := q.UpdateNonzero(Msg{NextAttempt: t})
807 if err != nil {
808 return 0, fmt.Errorf("selecting and updating messages in queue: %v", err)
809 }
810 msgqueueKick()
811 return n, nil
812}
813
814// HoldSet sets Hold for all matching messages and kicks the queue.
815func HoldSet(ctx context.Context, filter Filter, hold bool) (affected int, err error) {
816 err = DB.Write(ctx, func(tx *bstore.Tx) error {
817 q := bstore.QueryTx[Msg](tx)
818 if err := filter.apply(q); err != nil {
819 return err
820 }
821 n, err := q.UpdateFields(map[string]any{"Hold": hold})
822 if err != nil {
823 return fmt.Errorf("selecting and updating messages in queue: %v", err)
824 }
825 affected = n
826 return metricHoldUpdate(tx)
827 })
828 if err != nil {
829 return 0, err
830 }
831 msgqueueKick()
832 return affected, nil
833}
834
835// TransportSet changes the transport to use for the matching messages.
836func TransportSet(ctx context.Context, filter Filter, transport string) (affected int, err error) {
837 q := bstore.QueryDB[Msg](ctx, DB)
838 if err := filter.apply(q); err != nil {
839 return 0, err
840 }
841 n, err := q.UpdateFields(map[string]any{"Transport": transport})
842 if err != nil {
843 return 0, fmt.Errorf("selecting and updating messages in queue: %v", err)
844 }
845 msgqueueKick()
846 return n, nil
847}
848
849// Fail marks matching messages as failed for delivery, delivers a DSN to the
850// sender, and sends a webhook.
851//
852// Returns number of messages removed, which can be non-zero even in case of an
853// error.
854func Fail(ctx context.Context, log mlog.Log, f Filter) (affected int, err error) {
855 return failDrop(ctx, log, f, true)
856}
857
858// Drop removes matching messages from the queue. Messages are added as retired
859// message, webhooks with the "canceled" event are queued.
860//
861// Returns number of messages removed, which can be non-zero even in case of an
862// error.
863func Drop(ctx context.Context, log mlog.Log, f Filter) (affected int, err error) {
864 return failDrop(ctx, log, f, false)
865}
866
867func failDrop(ctx context.Context, log mlog.Log, filter Filter, fail bool) (affected int, err error) {
868 var msgs []Msg
869 err = DB.Write(ctx, func(tx *bstore.Tx) error {
870 q := bstore.QueryTx[Msg](tx)
871 if err := filter.apply(q); err != nil {
872 return err
873 }
874 var err error
875 msgs, err = q.List()
876 if err != nil {
877 return fmt.Errorf("getting messages to delete: %v", err)
878 }
879
880 if len(msgs) == 0 {
881 return nil
882 }
883
884 now := time.Now()
885 var remoteMTA dsn.NameIP
886 for i := range msgs {
887 result := MsgResult{
888 Start: now,
889 Error: "delivery canceled by admin",
890 }
891 msgs[i].Results = append(msgs[i].Results, result)
892 if fail {
893 if msgs[i].LastAttempt == nil {
894 msgs[i].LastAttempt = &now
895 }
896 deliverDSNFailure(log, msgs[i], remoteMTA, "", result.Error, nil)
897 }
898 }
899 event := webhook.EventCanceled
900 if fail {
901 event = webhook.EventFailed
902 }
903 if err := retireMsgs(log, tx, event, 0, "", nil, msgs...); err != nil {
904 return fmt.Errorf("removing queue messages from database: %w", err)
905 }
906 return metricHoldUpdate(tx)
907 })
908 if err != nil {
909 return 0, err
910 }
911 if len(msgs) > 0 {
912 if err := removeMsgsFS(log, msgs...); err != nil {
913 return len(msgs), fmt.Errorf("removing queue messages from file system: %w", err)
914 }
915 }
916 kick()
917 return len(msgs), nil
918}
919
920// RequireTLSSet updates the RequireTLS field of matching messages.
921func RequireTLSSet(ctx context.Context, filter Filter, requireTLS *bool) (affected int, err error) {
922 q := bstore.QueryDB[Msg](ctx, DB)
923 if err := filter.apply(q); err != nil {
924 return 0, err
925 }
926 n, err := q.UpdateFields(map[string]any{"RequireTLS": requireTLS})
927 msgqueueKick()
928 return n, err
929}
930
931// RetiredFilter filters messages to list or operate on. Used by admin web interface
932// and cli.
933//
934// Only non-empty/non-zero values are applied to the filter. Leaving all fields
935// empty/zero matches all messages.
936type RetiredFilter struct {
937 Max int
938 IDs []int64
939 Account string
940 From string
941 To string
942 Submitted string // Whether submitted before/after a time relative to now. ">$duration" or "<$duration", also with "now" for duration.
943 LastActivity string // ">$duration" or "<$duration", also with "now" for duration.
944 Transport *string
945 Success *bool
946}
947
948func (f RetiredFilter) apply(q *bstore.Query[MsgRetired]) error {
949 if len(f.IDs) > 0 {
950 q.FilterIDs(f.IDs)
951 }
952 applyTime := func(field string, s string) error {
953 orig := s
954 var before bool
955 if strings.HasPrefix(s, "<") {
956 before = true
957 } else if !strings.HasPrefix(s, ">") {
958 return fmt.Errorf(`must start with "<" for before or ">" for after a duration`)
959 }
960 s = strings.TrimSpace(s[1:])
961 var t time.Time
962 if s == "now" {
963 t = time.Now()
964 } else if d, err := time.ParseDuration(s); err != nil {
965 return fmt.Errorf("parsing duration %q: %v", orig, err)
966 } else {
967 t = time.Now().Add(d)
968 }
969 if before {
970 q.FilterLess(field, t)
971 } else {
972 q.FilterGreater(field, t)
973 }
974 return nil
975 }
976 if f.Submitted != "" {
977 if err := applyTime("Queued", f.Submitted); err != nil {
978 return fmt.Errorf("applying filter for submitted: %v", err)
979 }
980 }
981 if f.LastActivity != "" {
982 if err := applyTime("LastActivity", f.LastActivity); err != nil {
983 return fmt.Errorf("applying filter for last activity: %v", err)
984 }
985 }
986 if f.Account != "" {
987 q.FilterNonzero(MsgRetired{SenderAccount: f.Account})
988 }
989 if f.Transport != nil {
990 q.FilterEqual("Transport", *f.Transport)
991 }
992 if f.From != "" || f.To != "" {
993 q.FilterFn(func(m MsgRetired) bool {
994 return f.From != "" && strings.Contains(m.SenderLocalpart.String()+"@"+m.SenderDomainStr, f.From) || f.To != "" && strings.Contains(m.Recipient().XString(true), f.To)
995 })
996 }
997 if f.Success != nil {
998 q.FilterEqual("Success", *f.Success)
999 }
1000 if f.Max != 0 {
1001 q.Limit(f.Max)
1002 }
1003 return nil
1004}
1005
1006type RetiredSort struct {
1007 Field string // "Queued" or "LastActivity"/"".
1008 LastID int64 // If > 0, we return objects beyond this, less/greater depending on Asc.
1009 Last any // Value of Field for last object. Must be set iff LastID is set.
1010 Asc bool // Ascending, or descending.
1011}
1012
1013func (s RetiredSort) apply(q *bstore.Query[MsgRetired]) error {
1014 switch s.Field {
1015 case "", "LastActivity":
1016 s.Field = "LastActivity"
1017 case "Queued":
1018 s.Field = "Queued"
1019 default:
1020 return fmt.Errorf("unknown sort order field %q", s.Field)
1021 }
1022
1023 if s.LastID > 0 {
1024 ls, ok := s.Last.(string)
1025 if !ok {
1026 return fmt.Errorf("last should be string with time, not %T %q", s.Last, s.Last)
1027 }
1028 last, err := time.Parse(time.RFC3339Nano, ls)
1029 if err != nil {
1030 last, err = time.Parse(time.RFC3339, ls)
1031 }
1032 if err != nil {
1033 return fmt.Errorf("parsing last %q as time: %v", s.Last, err)
1034 }
1035 q.FilterNotEqual("ID", s.LastID)
1036 var fieldEqual func(m MsgRetired) bool
1037 if s.Field == "LastActivity" {
1038 fieldEqual = func(m MsgRetired) bool { return m.LastActivity.Equal(last) }
1039 } else {
1040 fieldEqual = func(m MsgRetired) bool { return m.Queued.Equal(last) }
1041 }
1042 if s.Asc {
1043 q.FilterGreaterEqual(s.Field, last)
1044 q.FilterFn(func(mr MsgRetired) bool {
1045 return !fieldEqual(mr) || mr.ID > s.LastID
1046 })
1047 } else {
1048 q.FilterLessEqual(s.Field, last)
1049 q.FilterFn(func(mr MsgRetired) bool {
1050 return !fieldEqual(mr) || mr.ID < s.LastID
1051 })
1052 }
1053 }
1054 if s.Asc {
1055 q.SortAsc(s.Field, "ID")
1056 } else {
1057 q.SortDesc(s.Field, "ID")
1058 }
1059 return nil
1060}
1061
1062// RetiredList returns retired messages.
1063func RetiredList(ctx context.Context, filter RetiredFilter, sort RetiredSort) ([]MsgRetired, error) {
1064 q := bstore.QueryDB[MsgRetired](ctx, DB)
1065 if err := filter.apply(q); err != nil {
1066 return nil, err
1067 }
1068 if err := sort.apply(q); err != nil {
1069 return nil, err
1070 }
1071 return q.List()
1072}
1073
1074type ReadReaderAtCloser interface {
1075 io.ReadCloser
1076 io.ReaderAt
1077}
1078
1079// OpenMessage opens a message present in the queue.
1080func OpenMessage(ctx context.Context, id int64) (ReadReaderAtCloser, error) {
1081 qm := Msg{ID: id}
1082 err := DB.Get(ctx, &qm)
1083 if err != nil {
1084 return nil, err
1085 }
1086 f, err := os.Open(qm.MessagePath())
1087 if err != nil {
1088 return nil, fmt.Errorf("open message file: %s", err)
1089 }
1090 r := store.FileMsgReader(qm.MsgPrefix, f)
1091 return r, err
1092}
1093
1094const maxConcurrentDeliveries = 10
1095const maxConcurrentHookDeliveries = 10
1096
1097// Start opens the database by calling Init, then starts the delivery and cleanup
1098// processes.
1099func Start(resolver dns.Resolver, done chan struct{}) error {
1100 if err := Init(); err != nil {
1101 return err
1102 }
1103
1104 go startQueue(resolver, done)
1105 go startHookQueue(done)
1106
1107 go cleanupMsgRetired(done)
1108 go cleanupHookRetired(done)
1109
1110 return nil
1111}
1112
1113func cleanupMsgRetired(done chan struct{}) {
1114 log := mlog.New("queue", nil)
1115
1116 defer func() {
1117 x := recover()
1118 if x != nil {
1119 log.Error("unhandled panic in cleanupMsgRetired", slog.Any("x", x))
1120 debug.PrintStack()
1121 metrics.PanicInc(metrics.Queue)
1122 }
1123 }()
1124
1125 timer := time.NewTimer(3 * time.Second)
1126 for {
1127 select {
1128 case <-mox.Shutdown.Done():
1129 done <- struct{}{}
1130 return
1131 case <-timer.C:
1132 }
1133
1134 cleanupMsgRetiredSingle(log)
1135 timer.Reset(time.Hour)
1136 }
1137}
1138
1139func cleanupMsgRetiredSingle(log mlog.Log) {
1140 n, err := bstore.QueryDB[MsgRetired](mox.Shutdown, DB).FilterLess("KeepUntil", time.Now()).Delete()
1141 log.Check(err, "removing old retired messages")
1142 if n > 0 {
1143 log.Debug("cleaned up retired messages", slog.Int("count", n))
1144 }
1145}
1146
1147func startQueue(resolver dns.Resolver, done chan struct{}) {
1148 // High-level delivery strategy advice: ../rfc/5321:3685
1149 log := mlog.New("queue", nil)
1150
1151 // Map keys are either dns.Domain.Name()'s, or string-formatted IP addresses.
1152 busyDomains := map[string]struct{}{}
1153
1154 timer := time.NewTimer(0)
1155
1156 for {
1157 select {
1158 case <-mox.Shutdown.Done():
1159 for len(busyDomains) > 0 {
1160 domain := <-deliveryResults
1161 delete(busyDomains, domain)
1162 }
1163 done <- struct{}{}
1164 return
1165 case <-msgqueue:
1166 case <-timer.C:
1167 case domain := <-deliveryResults:
1168 delete(busyDomains, domain)
1169 }
1170
1171 if len(busyDomains) >= maxConcurrentDeliveries {
1172 continue
1173 }
1174
1175 launchWork(log, resolver, busyDomains)
1176 timer.Reset(nextWork(mox.Shutdown, log, busyDomains))
1177 }
1178}
1179
1180func nextWork(ctx context.Context, log mlog.Log, busyDomains map[string]struct{}) time.Duration {
1181 q := bstore.QueryDB[Msg](ctx, DB)
1182 if len(busyDomains) > 0 {
1183 var doms []any
1184 for d := range busyDomains {
1185 doms = append(doms, d)
1186 }
1187 q.FilterNotEqual("RecipientDomainStr", doms...)
1188 }
1189 q.FilterEqual("Hold", false)
1190 q.SortAsc("NextAttempt")
1191 q.Limit(1)
1192 qm, err := q.Get()
1193 if err == bstore.ErrAbsent {
1194 return 24 * time.Hour
1195 } else if err != nil {
1196 log.Errorx("finding time for next delivery attempt", err)
1197 return 1 * time.Minute
1198 }
1199 return time.Until(qm.NextAttempt)
1200}
1201
1202func launchWork(log mlog.Log, resolver dns.Resolver, busyDomains map[string]struct{}) int {
1203 q := bstore.QueryDB[Msg](mox.Shutdown, DB)
1204 q.FilterLessEqual("NextAttempt", time.Now())
1205 q.FilterEqual("Hold", false)
1206 q.SortAsc("NextAttempt")
1207 q.Limit(maxConcurrentDeliveries)
1208 if len(busyDomains) > 0 {
1209 var doms []any
1210 for d := range busyDomains {
1211 doms = append(doms, d)
1212 }
1213 q.FilterNotEqual("RecipientDomainStr", doms...)
1214 }
1215 var msgs []Msg
1216 seen := map[string]bool{}
1217 err := q.ForEach(func(m Msg) error {
1218 dom := m.RecipientDomainStr
1219 if _, ok := busyDomains[dom]; !ok && !seen[dom] {
1220 seen[dom] = true
1221 msgs = append(msgs, m)
1222 }
1223 return nil
1224 })
1225 if err != nil {
1226 log.Errorx("querying for work in queue", err)
1227 mox.Sleep(mox.Shutdown, 1*time.Second)
1228 return -1
1229 }
1230
1231 for _, m := range msgs {
1232 busyDomains[m.RecipientDomainStr] = struct{}{}
1233 go deliver(log, resolver, m)
1234 }
1235 return len(msgs)
1236}
1237
1238// todo future: we may consider keeping message files around for a while after retiring. especially for failures to deliver. to inspect what exactly wasn't delivered.
1239
1240func removeMsgsFS(log mlog.Log, msgs ...Msg) error {
1241 var errs []string
1242 for _, m := range msgs {
1243 p := mox.DataDirPath(filepath.Join("queue", store.MessagePath(m.ID)))
1244 if err := os.Remove(p); err != nil {
1245 errs = append(errs, fmt.Sprintf("%s: %v", p, err))
1246 }
1247 }
1248 if len(errs) > 0 {
1249 return fmt.Errorf("removing message files from queue: %s", strings.Join(errs, "; "))
1250 }
1251 return nil
1252}
1253
1254// Move one or more messages to retire list or remove it. Webhooks are scheduled.
1255// IDs of msgs in suppressedMsgIDs caused a suppression to be added.
1256//
1257// Callers should update Msg.Results before calling.
1258//
1259// Callers must remove the messages from the file system afterwards, see
1260// removeMsgsFS. Callers must also kick the message and webhook queues.
1261func retireMsgs(log mlog.Log, tx *bstore.Tx, event webhook.OutgoingEvent, code int, secode string, suppressedMsgIDs []int64, msgs ...Msg) error {
1262 now := time.Now()
1263
1264 var hooks []Hook
1265 m0 := msgs[0]
1266 accConf, ok := mox.Conf.Account(m0.SenderAccount)
1267 var hookURL string
1268 if accConf.OutgoingWebhook != nil {
1269 hookURL = accConf.OutgoingWebhook.URL
1270 }
1271 log.Debug("retiring messages from queue", slog.Any("event", event), slog.String("account", m0.SenderAccount), slog.Bool("ok", ok), slog.String("webhookurl", hookURL))
1272 if hookURL != "" && (len(accConf.OutgoingWebhook.Events) == 0 || slices.Contains(accConf.OutgoingWebhook.Events, string(event))) {
1273 for _, m := range msgs {
1274 suppressing := slices.Contains(suppressedMsgIDs, m.ID)
1275 h, err := hookCompose(m, hookURL, accConf.OutgoingWebhook.Authorization, event, suppressing, code, secode)
1276 if err != nil {
1277 log.Errorx("composing webhooks while retiring messages from queue, not queueing hook for message", err, slog.Int64("msgid", m.ID), slog.Any("recipient", m.Recipient()))
1278 } else {
1279 hooks = append(hooks, h)
1280 }
1281 }
1282 }
1283
1284 msgKeep := 24 * 7 * time.Hour
1285 hookKeep := 24 * 7 * time.Hour
1286 if ok {
1287 msgKeep = accConf.KeepRetiredMessagePeriod
1288 hookKeep = accConf.KeepRetiredWebhookPeriod
1289 }
1290
1291 for _, m := range msgs {
1292 if err := tx.Delete(&m); err != nil {
1293 return err
1294 }
1295 }
1296 if msgKeep > 0 {
1297 for _, m := range msgs {
1298 rm := m.Retired(event == webhook.EventDelivered, now, now.Add(msgKeep))
1299 if err := tx.Insert(&rm); err != nil {
1300 return err
1301 }
1302 }
1303 }
1304
1305 for i := range hooks {
1306 if err := hookInsert(tx, &hooks[i], now, hookKeep); err != nil {
1307 return fmt.Errorf("enqueueing webhooks while retiring messages from queue: %v", err)
1308 }
1309 }
1310
1311 if len(hooks) > 0 {
1312 for _, h := range hooks {
1313 log.Debug("queued webhook while retiring message from queue", h.attrs()...)
1314 }
1315 hookqueueKick()
1316 }
1317 return nil
1318}
1319
1320// deliver attempts to deliver a message.
1321// The queue is updated, either by removing a delivered or permanently failed
1322// message, or updating the time for the next attempt. A DSN may be sent.
1323func deliver(log mlog.Log, resolver dns.Resolver, m0 Msg) {
1324 ctx := mox.Shutdown
1325
1326 qlog := log.WithCid(mox.Cid()).With(
1327 slog.Any("from", m0.Sender()),
1328 slog.Int("attempts", m0.Attempts))
1329
1330 defer func() {
1331 deliveryResults <- formatIPDomain(m0.RecipientDomain)
1332
1333 x := recover()
1334 if x != nil {
1335 qlog.Error("deliver panic", slog.Any("panic", x), slog.Int64("msgid", m0.ID), slog.Any("recipient", m0.Recipient()))
1336 debug.PrintStack()
1337 metrics.PanicInc(metrics.Queue)
1338 }
1339 }()
1340
1341 // We'll use a single transaction for the various checks, committing as soon as
1342 // we're done with it.
1343 xtx, err := DB.Begin(mox.Shutdown, true)
1344 if err != nil {
1345 qlog.Errorx("transaction for gathering messages to deliver", err)
1346 return
1347 }
1348 defer func() {
1349 if xtx != nil {
1350 err := xtx.Rollback()
1351 qlog.Check(err, "rolling back transaction after error delivering")
1352 }
1353 }()
1354
1355 // We register this attempt by setting LastAttempt, adding an empty Result, and
1356 // already setting NextAttempt in the future with exponential backoff. If we run
1357 // into trouble delivery below, at least we won't be bothering the receiving server
1358 // with our problems.
1359 // Delivery attempts: immediately, 7.5m, 15m, 30m, 1h, 2h (send delayed DSN), 4h,
1360 // 8h, 16h (send permanent failure DSN).
1361 // ../rfc/5321:3703
1362 // todo future: make the back off times configurable. ../rfc/5321:3713
1363 now := time.Now()
1364 var backoff time.Duration
1365 var origNextAttempt time.Time
1366 prepare := func() error {
1367 // Refresh message within transaction.
1368 m0 = Msg{ID: m0.ID}
1369 if err := xtx.Get(&m0); err != nil {
1370 return fmt.Errorf("get message to be delivered: %v", err)
1371 }
1372
1373 backoff = time.Duration(7*60+30+jitter.Intn(10)-5) * time.Second
1374 for i := 0; i < m0.Attempts; i++ {
1375 backoff *= time.Duration(2)
1376 }
1377 m0.Attempts++
1378 origNextAttempt = m0.NextAttempt
1379 m0.LastAttempt = &now
1380 m0.NextAttempt = now.Add(backoff)
1381 m0.Results = append(m0.Results, MsgResult{Start: now, Error: resultErrorDelivering})
1382 if err := xtx.Update(&m0); err != nil {
1383 return fmt.Errorf("update message to be delivered: %v", err)
1384 }
1385 return nil
1386 }
1387 if err := prepare(); err != nil {
1388 qlog.Errorx("storing delivery attempt", err, slog.Int64("msgid", m0.ID), slog.Any("recipient", m0.Recipient()))
1389 return
1390 }
1391
1392 var remoteMTA dsn.NameIP // Zero value, will not be included in DSN. ../rfc/3464:1027
1393
1394 // Check if recipient is on suppression list. If so, fail delivery.
1395 path := smtp.Path{Localpart: m0.RecipientLocalpart, IPDomain: m0.RecipientDomain}
1396 baseAddr := baseAddress(path).XString(true)
1397 qsup := bstore.QueryTx[webapi.Suppression](xtx)
1398 qsup.FilterNonzero(webapi.Suppression{Account: m0.SenderAccount, BaseAddress: baseAddr})
1399 exists, err := qsup.Exists()
1400 if err != nil || exists {
1401 if err != nil {
1402 qlog.Errorx("checking whether recipient address is in suppression list", err)
1403 } else {
1404 err := fmt.Errorf("not delivering to recipient address %s: %w", path.XString(true), errSuppressed)
1405 err = smtpclient.Error{Permanent: true, Err: err}
1406 failMsgsTx(qlog, xtx, []*Msg{&m0}, m0.DialedIPs, backoff, remoteMTA, err)
1407 }
1408 err = xtx.Commit()
1409 qlog.Check(err, "commit processing failure to deliver messages")
1410 xtx = nil
1411 kick()
1412 return
1413 }
1414
1415 resolveTransport := func(mm Msg) (string, config.Transport, bool) {
1416 if mm.Transport != "" {
1417 transport, ok := mox.Conf.Static.Transports[mm.Transport]
1418 if !ok {
1419 return "", config.Transport{}, false
1420 }
1421 return mm.Transport, transport, ok
1422 }
1423 route := findRoute(mm.Attempts, mm)
1424 return route.Transport, route.ResolvedTransport, true
1425 }
1426
1427 // Find route for transport to use for delivery attempt.
1428 m0.Attempts--
1429 transportName, transport, transportOK := resolveTransport(m0)
1430 m0.Attempts++
1431 if !transportOK {
1432 failMsgsTx(qlog, xtx, []*Msg{&m0}, m0.DialedIPs, backoff, remoteMTA, fmt.Errorf("cannot find transport %q", m0.Transport))
1433 err = xtx.Commit()
1434 qlog.Check(err, "commit processing failure to deliver messages")
1435 xtx = nil
1436 kick()
1437 return
1438 }
1439
1440 if transportName != "" {
1441 qlog = qlog.With(slog.String("transport", transportName))
1442 qlog.Debug("delivering with transport")
1443 }
1444
1445 // Attempt to gather more recipients for this identical message, only with the same
1446 // recipient domain, and under the same conditions (recipientdomain, attempts,
1447 // requiretls, transport). ../rfc/5321:3759
1448 msgs := []*Msg{&m0}
1449 if m0.BaseID != 0 {
1450 gather := func() error {
1451 q := bstore.QueryTx[Msg](xtx)
1452 q.FilterNonzero(Msg{BaseID: m0.BaseID, RecipientDomainStr: m0.RecipientDomainStr, Attempts: m0.Attempts - 1})
1453 q.FilterNotEqual("ID", m0.ID)
1454 q.FilterLessEqual("NextAttempt", origNextAttempt)
1455 q.FilterEqual("Hold", false)
1456 err := q.ForEach(func(xm Msg) error {
1457 mrtls := m0.RequireTLS != nil
1458 xmrtls := xm.RequireTLS != nil
1459 if mrtls != xmrtls || mrtls && *m0.RequireTLS != *xm.RequireTLS {
1460 return nil
1461 }
1462 tn, _, ok := resolveTransport(xm)
1463 if ok && tn == transportName {
1464 msgs = append(msgs, &xm)
1465 }
1466 return nil
1467 })
1468 if err != nil {
1469 return fmt.Errorf("looking up more recipients: %v", err)
1470 }
1471
1472 // Mark these additional messages as attempted too.
1473 for _, mm := range msgs[1:] {
1474 mm.Attempts++
1475 mm.NextAttempt = m0.NextAttempt
1476 mm.LastAttempt = m0.LastAttempt
1477 mm.Results = append(mm.Results, MsgResult{Start: now, Error: resultErrorDelivering})
1478 if err := xtx.Update(mm); err != nil {
1479 return fmt.Errorf("updating more message recipients for smtp transaction: %v", err)
1480 }
1481 }
1482 return nil
1483 }
1484 if err := gather(); err != nil {
1485 qlog.Errorx("error finding more recipients for message, will attempt to send to single recipient", err)
1486 msgs = msgs[:1]
1487 }
1488 }
1489
1490 if err := xtx.Commit(); err != nil {
1491 qlog.Errorx("commit of preparation to deliver", err, slog.Any("msgid", m0.ID))
1492 return
1493 }
1494 xtx = nil
1495
1496 if len(msgs) > 1 {
1497 ids := make([]int64, len(msgs))
1498 rcpts := make([]smtp.Path, len(msgs))
1499 for i, m := range msgs {
1500 ids[i] = m.ID
1501 rcpts[i] = m.Recipient()
1502 }
1503 qlog.Debug("delivering to multiple recipients", slog.Any("msgids", ids), slog.Any("recipients", rcpts))
1504 } else {
1505 qlog.Debug("delivering to single recipient", slog.Any("msgid", m0.ID), slog.Any("recipient", m0.Recipient()))
1506 }
1507
1508 if Localserve {
1509 deliverLocalserve(ctx, qlog, msgs, backoff)
1510 return
1511 }
1512
1513 // We gather TLS connection successes and failures during delivery, and we store
1514 // them in tlsrptdb. Every 24 hours we send an email with a report to the recipient
1515 // domains that opt in via a TLSRPT DNS record. For us, the tricky part is
1516 // collecting all reporting information. We've got several TLS modes
1517 // (opportunistic, DANE and/or MTA-STS (PKIX), overrides due to Require TLS).
1518 // Failures can happen at various levels: MTA-STS policies (apply to whole delivery
1519 // attempt/domain), MX targets (possibly multiple per delivery attempt, both for
1520 // MTA-STS and DANE).
1521 //
1522 // Once the SMTP client has tried a TLS handshake, we register success/failure,
1523 // regardless of what happens next on the connection. We also register failures
1524 // when they happen before we get to the SMTP client, but only if they are related
1525 // to TLS (and some DNSSEC).
1526 var recipientDomainResult tlsrpt.Result
1527 var hostResults []tlsrpt.Result
1528 defer func() {
1529 if mox.Conf.Static.NoOutgoingTLSReports || m0.RecipientDomain.IsIP() {
1530 return
1531 }
1532
1533 now := time.Now()
1534 dayUTC := now.UTC().Format("20060102")
1535
1536 // See if this contains a failure. If not, we'll mark TLS results for delivering
1537 // DMARC reports SendReport false, so we won't as easily get into a report sending
1538 // loop.
1539 var failure bool
1540 for _, result := range hostResults {
1541 if result.Summary.TotalFailureSessionCount > 0 {
1542 failure = true
1543 break
1544 }
1545 }
1546 if recipientDomainResult.Summary.TotalFailureSessionCount > 0 {
1547 failure = true
1548 }
1549
1550 results := make([]tlsrptdb.TLSResult, 0, 1+len(hostResults))
1551 tlsaPolicyDomains := map[string]bool{}
1552 addResult := func(r tlsrpt.Result, isHost bool) {
1553 var zerotype tlsrpt.PolicyType
1554 if r.Policy.Type == zerotype {
1555 return
1556 }
1557
1558 // Ensure we store policy domain in unicode in database.
1559 policyDomain, err := dns.ParseDomain(r.Policy.Domain)
1560 if err != nil {
1561 qlog.Errorx("parsing policy domain for tls result", err, slog.String("policydomain", r.Policy.Domain))
1562 return
1563 }
1564
1565 if r.Policy.Type == tlsrpt.TLSA {
1566 tlsaPolicyDomains[policyDomain.ASCII] = true
1567 }
1568
1569 tlsResult := tlsrptdb.TLSResult{
1570 PolicyDomain: policyDomain.Name(),
1571 DayUTC: dayUTC,
1572 RecipientDomain: m0.RecipientDomain.Domain.Name(),
1573 IsHost: isHost,
1574 SendReport: !m0.IsTLSReport && (!m0.IsDMARCReport || failure),
1575 Results: []tlsrpt.Result{r},
1576 }
1577 results = append(results, tlsResult)
1578 }
1579 for _, result := range hostResults {
1580 addResult(result, true)
1581 }
1582 // If we were delivering to a mail host directly (not a domain with MX records), we
1583 // are more likely to get a TLSA policy than an STS policy. Don't potentially
1584 // confuse operators with both a tlsa and no-policy-found result.
1585 // todo spec: ../rfc/8460:440 an explicit no-sts-policy result would be useful.
1586 if recipientDomainResult.Policy.Type != tlsrpt.NoPolicyFound || !tlsaPolicyDomains[recipientDomainResult.Policy.Domain] {
1587 addResult(recipientDomainResult, false)
1588 }
1589
1590 if len(results) > 0 {
1591 err := tlsrptdb.AddTLSResults(context.Background(), results)
1592 qlog.Check(err, "adding tls results to database for upcoming tlsrpt report")
1593 }
1594 }()
1595
1596 var dialer smtpclient.Dialer = &net.Dialer{}
1597 if transport.Submissions != nil {
1598 deliverSubmit(qlog, resolver, dialer, msgs, backoff, transportName, transport.Submissions, true, 465)
1599 } else if transport.Submission != nil {
1600 deliverSubmit(qlog, resolver, dialer, msgs, backoff, transportName, transport.Submission, false, 587)
1601 } else if transport.SMTP != nil {
1602 // todo future: perhaps also gather tlsrpt results for submissions.
1603 deliverSubmit(qlog, resolver, dialer, msgs, backoff, transportName, transport.SMTP, false, 25)
1604 } else {
1605 ourHostname := mox.Conf.Static.HostnameDomain
1606 if transport.Socks != nil {
1607 socksdialer, err := proxy.SOCKS5("tcp", transport.Socks.Address, nil, &net.Dialer{})
1608 if err != nil {
1609 failMsgsDB(qlog, msgs, msgs[0].DialedIPs, backoff, dsn.NameIP{}, fmt.Errorf("socks dialer: %v", err))
1610 return
1611 } else if d, ok := socksdialer.(smtpclient.Dialer); !ok {
1612 failMsgsDB(qlog, msgs, msgs[0].DialedIPs, backoff, dsn.NameIP{}, fmt.Errorf("socks dialer is not a contextdialer"))
1613 return
1614 } else {
1615 dialer = d
1616 }
1617 ourHostname = transport.Socks.Hostname
1618 }
1619 recipientDomainResult, hostResults = deliverDirect(qlog, resolver, dialer, ourHostname, transportName, transport.Direct, msgs, backoff)
1620 }
1621}
1622
1623func findRoute(attempt int, m Msg) config.Route {
1624 routesAccount, routesDomain, routesGlobal := mox.Conf.Routes(m.SenderAccount, m.SenderDomain.Domain)
1625 if r, ok := findRouteInList(attempt, m, routesAccount); ok {
1626 return r
1627 }
1628 if r, ok := findRouteInList(attempt, m, routesDomain); ok {
1629 return r
1630 }
1631 if r, ok := findRouteInList(attempt, m, routesGlobal); ok {
1632 return r
1633 }
1634 return config.Route{}
1635}
1636
1637func findRouteInList(attempt int, m Msg, routes []config.Route) (config.Route, bool) {
1638 for _, r := range routes {
1639 if routeMatch(attempt, m, r) {
1640 return r, true
1641 }
1642 }
1643 return config.Route{}, false
1644}
1645
1646func routeMatch(attempt int, m Msg, r config.Route) bool {
1647 return attempt >= r.MinimumAttempts && routeMatchDomain(r.FromDomainASCII, m.SenderDomain.Domain) && routeMatchDomain(r.ToDomainASCII, m.RecipientDomain.Domain)
1648}
1649
1650func routeMatchDomain(l []string, d dns.Domain) bool {
1651 if len(l) == 0 {
1652 return true
1653 }
1654 for _, e := range l {
1655 if d.ASCII == e || strings.HasPrefix(e, ".") && (d.ASCII == e[1:] || strings.HasSuffix(d.ASCII, e)) {
1656 return true
1657 }
1658 }
1659 return false
1660}
1661
1662// Returns string representing delivery result for err, and number of delivered and
1663// failed messages.
1664//
1665// Values: ok, okpartial, timeout, canceled, temperror, permerror, error.
1666func deliveryResult(err error, delivered, failed int) string {
1667 var cerr smtpclient.Error
1668 switch {
1669 case err == nil:
1670 if delivered == 0 {
1671 return "error"
1672 } else if failed > 0 {
1673 return "okpartial"
1674 }
1675 return "ok"
1676 case errors.Is(err, os.ErrDeadlineExceeded), errors.Is(err, context.DeadlineExceeded):
1677 return "timeout"
1678 case errors.Is(err, context.Canceled):
1679 return "canceled"
1680 case errors.As(err, &cerr):
1681 if cerr.Permanent {
1682 return "permerror"
1683 }
1684 return "temperror"
1685 }
1686 return "error"
1687}
1688