1package dmarcdb
2
3// Sending TLS reports and DMARC reports is very similar. See ../dmarcdb/eval.go:/similar and ../tlsrptsend/send.go:/similar.
4
5import (
6 "compress/gzip"
7 "context"
8 "encoding/xml"
9 "errors"
10 "fmt"
11 "io"
12 "mime"
13 "mime/multipart"
14 "net/textproto"
15 "net/url"
16 "os"
17 "path/filepath"
18 "runtime/debug"
19 "sort"
20 "strings"
21 "sync"
22 "time"
23
24 "golang.org/x/exp/maps"
25 "golang.org/x/exp/slices"
26
27 "github.com/prometheus/client_golang/prometheus"
28 "github.com/prometheus/client_golang/prometheus/promauto"
29
30 "github.com/mjl-/bstore"
31
32 "github.com/mjl-/mox/dkim"
33 "github.com/mjl-/mox/dmarc"
34 "github.com/mjl-/mox/dmarcrpt"
35 "github.com/mjl-/mox/dns"
36 "github.com/mjl-/mox/message"
37 "github.com/mjl-/mox/metrics"
38 "github.com/mjl-/mox/mlog"
39 "github.com/mjl-/mox/mox-"
40 "github.com/mjl-/mox/moxio"
41 "github.com/mjl-/mox/moxvar"
42 "github.com/mjl-/mox/publicsuffix"
43 "github.com/mjl-/mox/queue"
44 "github.com/mjl-/mox/smtp"
45 "github.com/mjl-/mox/store"
46)
47
48var (
49 metricReport = promauto.NewCounter(
50 prometheus.CounterOpts{
51 Name: "mox_dmarcdb_report_queued_total",
52 Help: "Total messages with DMARC aggregate/error reports queued.",
53 },
54 )
55 metricReportError = promauto.NewCounter(
56 prometheus.CounterOpts{
57 Name: "mox_dmarcdb_report_error_total",
58 Help: "Total errors while composing or queueing DMARC aggregate/error reports.",
59 },
60 )
61)
62
63var (
64 EvalDBTypes = []any{Evaluation{}, SuppressAddress{}} // Types stored in DB.
65 // Exported for backups. For incoming deliveries the SMTP server adds evaluations
66 // to the database. Every hour, a goroutine wakes up that gathers evaluations from
67 // the last hour(s), sends a report, and removes the evaluations from the database.
68 EvalDB *bstore.DB
69 evalMutex sync.Mutex
70)
71
72// Evaluation is the result of an evaluation of a DMARC policy, to be included
73// in a DMARC report.
74type Evaluation struct {
75 ID int64
76
77 // Domain where DMARC policy was found, could be the organizational domain while
78 // evaluation was for a subdomain. Unicode. Same as domain found in
79 // PolicyPublished. A separate field for its index.
80 PolicyDomain string `bstore:"index"`
81
82 // Time of evaluation, determines which report (covering whole hours) this
83 // evaluation will be included in.
84 Evaluated time.Time `bstore:"default now"`
85
86 // If optional, this evaluation is not a reason to send a DMARC report, but it will
87 // be included when a report is sent due to other non-optional evaluations. Set for
88 // evaluations of incoming DMARC reports. We don't want such deliveries causing us to
89 // send a report, or we would keep exchanging reporting messages forever. Also set
90 // for when evaluation is a DMARC reject for domains we haven't positively
91 // interacted with, to prevent being used to flood an unsuspecting domain with
92 // reports.
93 Optional bool
94
95 // Effective aggregate reporting interval in hours. Between 1 and 24, rounded up
96 // from seconds from policy to first number that can divide 24.
97 IntervalHours int
98
99 // "rua" in DMARC record, we only store evaluations for records with aggregate reporting addresses, so always non-empty.
100 Addresses []string
101
102 // Policy used for evaluation. We don't store the "fo" field for failure reporting
103 // options, since we don't send failure reports for individual messages.
104 PolicyPublished dmarcrpt.PolicyPublished
105
106 // For "row" in a report record.
107 SourceIP string
108 Disposition dmarcrpt.Disposition
109 AlignedDKIMPass bool
110 AlignedSPFPass bool
111 OverrideReasons []dmarcrpt.PolicyOverrideReason
112
113 // For "identifiers" in a report record.
114 EnvelopeTo string
115 EnvelopeFrom string
116 HeaderFrom string
117
118 // For "auth_results" in a report record.
119 DKIMResults []dmarcrpt.DKIMAuthResult
120 SPFResults []dmarcrpt.SPFAuthResult
121}
122
123// SuppressAddress is a reporting address for which outgoing DMARC reports
124// will be suppressed for a period.
125type SuppressAddress struct {
126 ID int64
127 Inserted time.Time `bstore:"default now"`
128 ReportingAddress string `bstore:"unique"`
129 Until time.Time `bstore:"nonzero"`
130 Comment string
131}
132
133var dmarcResults = map[bool]dmarcrpt.DMARCResult{
134 false: dmarcrpt.DMARCFail,
135 true: dmarcrpt.DMARCPass,
136}
137
138// ReportRecord turns an evaluation into a record that can be included in a
139// report.
140func (e Evaluation) ReportRecord(count int) dmarcrpt.ReportRecord {
141 return dmarcrpt.ReportRecord{
142 Row: dmarcrpt.Row{
143 SourceIP: e.SourceIP,
144 Count: count,
145 PolicyEvaluated: dmarcrpt.PolicyEvaluated{
146 Disposition: e.Disposition,
147 DKIM: dmarcResults[e.AlignedDKIMPass],
148 SPF: dmarcResults[e.AlignedSPFPass],
149 Reasons: e.OverrideReasons,
150 },
151 },
152 Identifiers: dmarcrpt.Identifiers{
153 EnvelopeTo: e.EnvelopeTo,
154 EnvelopeFrom: e.EnvelopeFrom,
155 HeaderFrom: e.HeaderFrom,
156 },
157 AuthResults: dmarcrpt.AuthResults{
158 DKIM: e.DKIMResults,
159 SPF: e.SPFResults,
160 },
161 }
162}
163
164func evalDB(ctx context.Context) (rdb *bstore.DB, rerr error) {
165 evalMutex.Lock()
166 defer evalMutex.Unlock()
167 if EvalDB == nil {
168 p := mox.DataDirPath("dmarceval.db")
169 os.MkdirAll(filepath.Dir(p), 0770)
170 db, err := bstore.Open(ctx, p, &bstore.Options{Timeout: 5 * time.Second, Perm: 0660}, EvalDBTypes...)
171 if err != nil {
172 return nil, err
173 }
174 EvalDB = db
175 }
176 return EvalDB, nil
177}
178
179var intervalOpts = []int{24, 12, 8, 6, 4, 3, 2}
180
181func intervalHours(seconds int) int {
182 hours := (seconds + 3600 - 1) / 3600
183 for _, opt := range intervalOpts {
184 if hours >= opt {
185 return opt
186 }
187 }
188 return 1
189}
190
191// AddEvaluation adds the result of a DMARC evaluation for an incoming message
192// to the database.
193//
194// AddEvaluation sets Evaluation.IntervalHours based on
195// aggregateReportingIntervalSeconds.
196func AddEvaluation(ctx context.Context, aggregateReportingIntervalSeconds int, e *Evaluation) error {
197 e.IntervalHours = intervalHours(aggregateReportingIntervalSeconds)
198
199 db, err := evalDB(ctx)
200 if err != nil {
201 return err
202 }
203
204 e.ID = 0
205 return db.Insert(ctx, e)
206}
207
208// Evaluations returns all evaluations in the database.
209func Evaluations(ctx context.Context) ([]Evaluation, error) {
210 db, err := evalDB(ctx)
211 if err != nil {
212 return nil, err
213 }
214
215 q := bstore.QueryDB[Evaluation](ctx, db)
216 q.SortAsc("Evaluated")
217 return q.List()
218}
219
220// EvaluationStat summarizes stored evaluations, for inclusion in an upcoming
221// aggregate report, for a domain.
222type EvaluationStat struct {
223 Domain dns.Domain
224 Dispositions []string
225 Count int
226 SendReport bool
227}
228
229// EvaluationStats returns evaluation counts and report-sending status per domain.
230func EvaluationStats(ctx context.Context) (map[string]EvaluationStat, error) {
231 db, err := evalDB(ctx)
232 if err != nil {
233 return nil, err
234 }
235
236 r := map[string]EvaluationStat{}
237
238 err = bstore.QueryDB[Evaluation](ctx, db).ForEach(func(e Evaluation) error {
239 if stat, ok := r[e.PolicyDomain]; ok {
240 if !slices.Contains(stat.Dispositions, string(e.Disposition)) {
241 stat.Dispositions = append(stat.Dispositions, string(e.Disposition))
242 }
243 stat.Count++
244 stat.SendReport = stat.SendReport || !e.Optional
245 r[e.PolicyDomain] = stat
246 } else {
247 dom, err := dns.ParseDomain(e.PolicyDomain)
248 if err != nil {
249 return fmt.Errorf("parsing domain %q: %v", e.PolicyDomain, err)
250 }
251 r[e.PolicyDomain] = EvaluationStat{
252 Domain: dom,
253 Dispositions: []string{string(e.Disposition)},
254 Count: 1,
255 SendReport: !e.Optional,
256 }
257 }
258 return nil
259 })
260 return r, err
261}
262
263// EvaluationsDomain returns all evaluations for a domain.
264func EvaluationsDomain(ctx context.Context, domain dns.Domain) ([]Evaluation, error) {
265 db, err := evalDB(ctx)
266 if err != nil {
267 return nil, err
268 }
269
270 q := bstore.QueryDB[Evaluation](ctx, db)
271 q.FilterNonzero(Evaluation{PolicyDomain: domain.Name()})
272 q.SortAsc("Evaluated")
273 return q.List()
274}
275
276// RemoveEvaluationsDomain removes evaluations for domain so they won't be sent in
277// an aggregate report.
278func RemoveEvaluationsDomain(ctx context.Context, domain dns.Domain) error {
279 db, err := evalDB(ctx)
280 if err != nil {
281 return err
282 }
283
284 q := bstore.QueryDB[Evaluation](ctx, db)
285 q.FilterNonzero(Evaluation{PolicyDomain: domain.Name()})
286 _, err = q.Delete()
287 return err
288}
289
290var jitterRand = mox.NewPseudoRand()
291
292// time to sleep until next whole hour t, replaced by tests.
293// Jitter so we don't cause load at exactly whole hours, other processes may
294// already be doing that.
295var jitteredTimeUntil = func(t time.Time) time.Duration {
296 return time.Until(t.Add(time.Duration(30+jitterRand.Intn(60)) * time.Second))
297}
298
299// Start launches a goroutine that wakes up at each whole hour (plus jitter) and
300// sends DMARC reports to domains that requested them.
301func Start(resolver dns.Resolver) {
302 go func() {
303 log := mlog.New("dmarcdb")
304
305 defer func() {
306 // In case of panic don't take the whole program down.
307 x := recover()
308 if x != nil {
309 log.Error("recover from panic", mlog.Field("panic", x))
310 debug.PrintStack()
311 metrics.PanicInc(metrics.Dmarcdb)
312 }
313 }()
314
315 timer := time.NewTimer(time.Hour)
316 defer timer.Stop()
317
318 ctx := mox.Shutdown
319
320 db, err := evalDB(ctx)
321 if err != nil {
322 log.Errorx("opening dmarc evaluations database for sending dmarc aggregate reports, not sending reports", err)
323 return
324 }
325
326 for {
327 now := time.Now()
328 nextEnd := nextWholeHour(now)
329 timer.Reset(jitteredTimeUntil(nextEnd))
330
331 select {
332 case <-ctx.Done():
333 log.Info("dmarc aggregate report sender shutting down")
334 return
335 case <-timer.C:
336 }
337
338 // Gather report intervals we want to process now. Multiples of hours that can
339 // divide 24, starting from UTC.
340 // ../rfc/7489:1750
341 utchour := nextEnd.UTC().Hour()
342 if utchour == 0 {
343 utchour = 24
344 }
345 intervals := []int{}
346 for _, ival := range intervalOpts {
347 if ival*(utchour/ival) == utchour {
348 intervals = append(intervals, ival)
349 }
350 }
351 intervals = append(intervals, 1)
352
353 // Remove evaluations older than 48 hours (2 reports with the default and maximum
354 // 24 hour interval). They should have been processed by now. We may have kept them
355 // during temporary errors, but persistent temporary errors shouldn't fill up our
356 // database. This also cleans up evaluations that were all optional for a domain.
357 _, err := bstore.QueryDB[Evaluation](ctx, db).FilterLess("Evaluated", nextEnd.Add(-48*time.Hour)).Delete()
358 log.Check(err, "removing stale dmarc evaluations from database")
359
360 clog := log.WithCid(mox.Cid())
361 clog.Info("sending dmarc aggregate reports", mlog.Field("end", nextEnd.UTC()), mlog.Field("intervals", intervals))
362 if err := sendReports(ctx, clog, resolver, db, nextEnd, intervals); err != nil {
363 clog.Errorx("sending dmarc aggregate reports", err)
364 metricReportError.Inc()
365 } else {
366 clog.Info("finished sending dmarc aggregate reports")
367 }
368 }
369 }()
370}
371
372func nextWholeHour(now time.Time) time.Time {
373 t := now
374 t = t.Add(time.Hour)
375 return time.Date(t.Year(), t.Month(), t.Day(), t.Hour(), 0, 0, 0, t.Location())
376}
377
378// We don't send reports at full speed. In the future, we could try to stretch out
379// reports a bit smarter. E.g. over 5 minutes with some minimum interval, and
380// perhaps faster and in parallel when there are lots of reports. Perhaps also
381// depending on reporting interval (faster for 1h, slower for 24h).
382// Replaced by tests.
383var sleepBetween = func(ctx context.Context, between time.Duration) (ok bool) {
384 t := time.NewTimer(between)
385 select {
386 case <-ctx.Done():
387 t.Stop()
388 return false
389 case <-t.C:
390 return true
391 }
392}
393
394// sendReports gathers all policy domains that have evaluations that should
395// receive a DMARC report and sends a report to each.
396func sendReports(ctx context.Context, log *mlog.Log, resolver dns.Resolver, db *bstore.DB, endTime time.Time, intervals []int) error {
397 ivals := make([]any, len(intervals))
398 for i, v := range intervals {
399 ivals[i] = v
400 }
401
402 destDomains := map[string]bool{}
403
404 // Gather all domains that we plan to send to.
405 nsend := 0
406 q := bstore.QueryDB[Evaluation](ctx, db)
407 q.FilterLess("Evaluated", endTime)
408 q.FilterEqual("IntervalHours", ivals...)
409 err := q.ForEach(func(e Evaluation) error {
410 if !e.Optional && !destDomains[e.PolicyPublished.Domain] {
411 nsend++
412 }
413 destDomains[e.PolicyPublished.Domain] = destDomains[e.PolicyPublished.Domain] || !e.Optional
414 return nil
415 })
416 if err != nil {
417 return fmt.Errorf("looking for domains to send reports to: %v", err)
418 }
419
420 var wg sync.WaitGroup
421
422 // Sleep in between sending reports. We process hourly, and spread the reports over
423 // the hour, but with max 5 minute interval.
424 between := 45 * time.Minute
425 if nsend > 0 {
426 between /= time.Duration(nsend)
427 }
428 if between > 5*time.Minute {
429 between = 5 * time.Minute
430 }
431
432 // Attempt to send report to each domain.
433 n := 0
434 for d, send := range destDomains {
435 // Cleanup evaluations for domain with only optionals.
436 if !send {
437 removeEvaluations(ctx, log, db, endTime, d)
438 continue
439 }
440
441 if n > 0 {
442 if ok := sleepBetween(ctx, between); !ok {
443 return nil
444 }
445 }
446 n++
447
448 // Send in goroutine, so a slow process doesn't block progress.
449 wg.Add(1)
450 go func(domain string) {
451 defer func() {
452 // In case of panic don't take the whole program down.
453 x := recover()
454 if x != nil {
455 log.Error("unhandled panic in dmarcdb sendReports", mlog.Field("panic", x))
456 debug.PrintStack()
457 metrics.PanicInc(metrics.Dmarcdb)
458 }
459 }()
460 defer wg.Done()
461
462 rlog := log.WithCid(mox.Cid()).Fields(mlog.Field("domain", domain))
463 rlog.Info("sending dmarc report")
464 if _, err := sendReportDomain(ctx, rlog, resolver, db, endTime, domain); err != nil {
465 rlog.Errorx("sending dmarc aggregate report to domain", err)
466 metricReportError.Inc()
467 }
468 }(d)
469 }
470
471 wg.Wait()
472
473 return nil
474}
475
476type recipient struct {
477 address smtp.Address
478 maxSize uint64
479}
480
481func parseRecipient(log *mlog.Log, uri dmarc.URI) (r recipient, ok bool) {
482 log = log.Fields(mlog.Field("uri", uri.Address))
483
484 u, err := url.Parse(uri.Address)
485 if err != nil {
486 log.Debugx("parsing uri in dmarc record rua value", err)
487 return r, false
488 }
489 if !strings.EqualFold(u.Scheme, "mailto") {
490 log.Debug("skipping unrecognized scheme in dmarc record rua value")
491 return r, false
492 }
493 addr, err := smtp.ParseAddress(u.Opaque)
494 if err != nil {
495 log.Debugx("parsing mailto uri in dmarc record rua value", err)
496 return r, false
497 }
498
499 r = recipient{addr, uri.MaxSize}
500 // ../rfc/7489:1197
501 switch uri.Unit {
502 case "k", "K":
503 r.maxSize *= 1024
504 case "m", "M":
505 r.maxSize *= 1024 * 1024
506 case "g", "G":
507 r.maxSize *= 1024 * 1024 * 1024
508 case "t", "T":
509 // Oh yeah, terabyte-sized reports!
510 r.maxSize *= 1024 * 1024 * 1024 * 1024
511 case "":
512 default:
513 log.Debug("unrecognized max size unit in dmarc record rua value", mlog.Field("unit", uri.Unit))
514 return r, false
515 }
516
517 return r, true
518}
519
520func removeEvaluations(ctx context.Context, log *mlog.Log, db *bstore.DB, endTime time.Time, domain string) {
521 q := bstore.QueryDB[Evaluation](ctx, db)
522 q.FilterLess("Evaluated", endTime)
523 q.FilterNonzero(Evaluation{PolicyDomain: domain})
524 _, err := q.Delete()
525 log.Check(err, "removing evaluations after processing for dmarc aggregate report")
526}
527
528// replaceable for testing.
529var queueAdd = queue.Add
530
531func sendReportDomain(ctx context.Context, log *mlog.Log, resolver dns.Resolver, db *bstore.DB, endTime time.Time, domain string) (cleanup bool, rerr error) {
532 dom, err := dns.ParseDomain(domain)
533 if err != nil {
534 return false, fmt.Errorf("parsing domain for sending reports: %v", err)
535 }
536
537 // We'll cleanup records by default.
538 cleanup = true
539 // If we encounter a temporary error we cancel cleanup of evaluations on error.
540 tempError := false
541
542 defer func() {
543 if !cleanup || tempError {
544 log.Debug("not cleaning up evaluations after attempting to send dmarc aggregate report")
545 } else {
546 removeEvaluations(ctx, log, db, endTime, domain)
547 }
548 }()
549
550 // We're going to build up this report.
551 report := dmarcrpt.Feedback{
552 Version: "1.0",
553 ReportMetadata: dmarcrpt.ReportMetadata{
554 OrgName: mox.Conf.Static.HostnameDomain.ASCII,
555 Email: "postmaster@" + mox.Conf.Static.HostnameDomain.ASCII,
556 // ReportID and DateRange are set after we've seen evaluations.
557 // Errors is filled below when we encounter problems.
558 },
559 // We'll fill the records below.
560 Records: []dmarcrpt.ReportRecord{},
561 }
562
563 var errors []string // For report.ReportMetaData.Errors
564
565 // Check if we should be sending a report at all: if there are rua URIs in the
566 // current DMARC record. The interval may have changed too, but we'll flush out our
567 // evaluations regardless. We always use the latest DMARC record when sending, but
568 // we'll lump all policies of the last interval into one report.
569 // ../rfc/7489:1714
570 status, _, record, _, _, err := dmarc.Lookup(ctx, resolver, dom)
571 if err != nil {
572 // todo future: we could perhaps still send this report, assuming the values we know. in case of temporary error, we could also schedule again regardless of next interval hour (we would now only retry a 24h-interval report after 24h passed).
573 // Remove records unless it was a temporary error. We'll try again next round.
574 cleanup = status != dmarc.StatusTemperror
575 return cleanup, fmt.Errorf("looking up current dmarc record for reporting address: %v", err)
576 }
577
578 var recipients []recipient
579
580 // Gather all aggregate reporting addresses to try to send to. We'll start with
581 // those in the initial DMARC record, but will follow external reporting addresses
582 // and possibly update the list.
583 for _, uri := range record.AggregateReportAddresses {
584 r, ok := parseRecipient(log, uri)
585 if !ok {
586 continue
587 }
588
589 // Check if domain of rua recipient has the same organizational domain as for the
590 // evaluations. If not, we need to verify we are allowed to send.
591 ruaOrgDom := publicsuffix.Lookup(ctx, r.address.Domain)
592 evalOrgDom := publicsuffix.Lookup(ctx, dom)
593
594 if ruaOrgDom == evalOrgDom {
595 recipients = append(recipients, r)
596 continue
597 }
598
599 // Verify and follow addresses in other organizational domain through
600 // <policydomain>._report._dmarc.<host> lookup.
601 // ../rfc/7489:1556
602 accepts, status, records, _, _, err := dmarc.LookupExternalReportsAccepted(ctx, resolver, evalOrgDom, r.address.Domain)
603 log.Debugx("checking if rua address with different organization domain has opted into receiving dmarc reports", err,
604 mlog.Field("policydomain", evalOrgDom),
605 mlog.Field("destinationdomain", r.address.Domain),
606 mlog.Field("accepts", accepts),
607 mlog.Field("status", status))
608 if status == dmarc.StatusTemperror {
609 // With a temporary error, we'll try to get the report the delivered anyway,
610 // perhaps there are multiple recipients.
611 // ../rfc/7489:1578
612 tempError = true
613 errors = append(errors, "temporary error checking authorization for report delegation to external address")
614 }
615 if !accepts {
616 errors = append(errors, fmt.Sprintf("rua %s is external domain that does not opt-in to receiving dmarc records through _report dmarc record", r.address))
617 continue
618 }
619
620 // We can follow a _report DMARC DNS record once. In that record, a domain may
621 // specify alternative addresses that we should send reports to instead. Such
622 // alternative address(es) must have the same host. If not, we ignore the new
623 // value. Behaviour for multiple records and/or multiple new addresses is
624 // underspecified. We'll replace an address with one or more new addresses, and
625 // keep the original if there was no candidate (which covers the case of invalid
626 // alternative addresses and no new address specified).
627 // ../rfc/7489:1600
628 foundReplacement := false
629 rlog := log.Fields(mlog.Field("followedaddress", uri.Address))
630 for _, record := range records {
631 for _, exturi := range record.AggregateReportAddresses {
632 extr, ok := parseRecipient(rlog, exturi)
633 if !ok {
634 continue
635 }
636 if extr.address.Domain != r.address.Domain {
637 rlog.Debug("rua address in external _report dmarc record has different host than initial dmarc record, ignoring new name", mlog.Field("externaladdress", extr.address))
638 errors = append(errors, fmt.Sprintf("rua %s is external domain with a replacement address %s with different host", r.address, extr.address))
639 } else {
640 rlog.Debug("using replacement rua address from external _report dmarc record", mlog.Field("externaladdress", extr.address))
641 foundReplacement = true
642 recipients = append(recipients, extr)
643 }
644 }
645 }
646 if !foundReplacement {
647 recipients = append(recipients, r)
648 }
649 }
650
651 if len(recipients) == 0 {
652 // No reports requested, perfectly fine, no work to do for us.
653 log.Debug("no aggregate reporting addresses configured")
654 return true, nil
655 }
656
657 // We count idential records. Can be common with a domain sending quite some email.
658 // Though less if the sending domain has many IPs. In the future, we may want to
659 // remove some details from records so we can aggregate them into fewer rows.
660 type recordCount struct {
661 dmarcrpt.ReportRecord
662 count int
663 }
664 counts := map[string]recordCount{}
665
666 var first, last Evaluation // For daterange.
667 var sendReport bool
668
669 q := bstore.QueryDB[Evaluation](ctx, db)
670 q.FilterLess("Evaluated", endTime)
671 q.FilterNonzero(Evaluation{PolicyDomain: domain})
672 q.SortAsc("Evaluated")
673 err = q.ForEach(func(e Evaluation) error {
674 if first.ID == 0 {
675 first = e
676 }
677 last = e
678
679 record := e.ReportRecord(0)
680
681 // todo future: if we see many unique records from a single ip (exact ipv4 or ipv6 subnet), we may want to coalesce them into a single record, leaving out the fields that make them: a single ip could cause a report to contain many records with many unique domains, selectors, etc. it may compress relatively well, but the reports could still be huge.
682
683 // Simple but inefficient way to aggregate identical records. We may want to turn
684 // records into smaller representation in the future.
685 recbuf, err := xml.Marshal(record)
686 if err != nil {
687 return fmt.Errorf("xml marshal of report record: %v", err)
688 }
689 recstr := string(recbuf)
690 counts[recstr] = recordCount{record, counts[recstr].count + 1}
691 if !e.Optional {
692 sendReport = true
693 }
694 return nil
695 })
696 if err != nil {
697 return false, fmt.Errorf("gathering evaluations for report: %v", err)
698 }
699
700 if !sendReport {
701 log.Debug("no non-optional evaluations for domain, not sending dmarc aggregate report")
702 return true, nil
703 }
704
705 // Set begin and end date range. We try to set it to whole intervals as requested
706 // by the domain owner. The typical, default and maximum interval is 24 hours. But
707 // we allow any whole number of hours that can divide 24 hours. If we have an
708 // evaluation that is older, we may have had a failure to send earlier. We include
709 // those earlier intervals in this report as well.
710 //
711 // Although "end" could be interpreted as exclusive, to be on the safe side
712 // regarding client behaviour, and (related) to mimic large existing DMARC report
713 // senders, we set it to the last second of the period this report covers.
714 report.ReportMetadata.DateRange.End = endTime.Add(-time.Second).Unix()
715 interval := time.Duration(first.IntervalHours) * time.Hour
716 beginTime := endTime.Add(-interval)
717 for first.Evaluated.Before(beginTime) {
718 beginTime = beginTime.Add(-interval)
719 }
720 report.ReportMetadata.DateRange.Begin = beginTime.Unix()
721
722 // yyyymmddHH, we only send one report per hour, so should be unique per policy
723 // domain. We also add a truly unique id based on first evaluation id used without
724 // revealing the number of evaluations we have. Reuse of ReceivedID is not great,
725 // but shouldn't hurt.
726 report.ReportMetadata.ReportID = endTime.UTC().Format("20060102.15") + "." + mox.ReceivedID(first.ID)
727
728 // We may include errors we encountered when composing the report. We
729 // don't currently include errors about dmarc evaluations, e.g. DNS
730 // lookup errors during incoming deliveries.
731 report.ReportMetadata.Errors = errors
732
733 // We'll fill this with the last-used record, not the one we fetch fresh from DSN.
734 // They will almost always be the same, but if not, the fresh record was never
735 // actually used for evaluations, so no point in reporting it.
736 report.PolicyPublished = last.PolicyPublished
737
738 // Process records in-order for testable results.
739 recstrs := maps.Keys(counts)
740 sort.Strings(recstrs)
741 for _, recstr := range recstrs {
742 rc := counts[recstr]
743 rc.ReportRecord.Row.Count = rc.count
744 report.Records = append(report.Records, rc.ReportRecord)
745 }
746
747 reportFile, err := store.CreateMessageTemp("dmarcreportout")
748 if err != nil {
749 return false, fmt.Errorf("creating temporary file for outgoing dmarc aggregate report: %v", err)
750 }
751 defer store.CloseRemoveTempFile(log, reportFile, "generated dmarc aggregate report")
752
753 gzw := gzip.NewWriter(reportFile)
754 _, err = fmt.Fprint(gzw, xml.Header)
755 enc := xml.NewEncoder(gzw)
756 enc.Indent("", "\t") // Keep up pretention that xml is human-readable.
757 if err == nil {
758 err = enc.Encode(report)
759 }
760 if err == nil {
761 err = enc.Close()
762 }
763 if err == nil {
764 err = gzw.Close()
765 }
766 if err != nil {
767 return true, fmt.Errorf("writing dmarc aggregate report as xml with gzip: %v", err)
768 }
769
770 msgf, err := store.CreateMessageTemp("dmarcreportmsgout")
771 if err != nil {
772 return false, fmt.Errorf("creating temporary message file with outgoing dmarc aggregate report: %v", err)
773 }
774 defer store.CloseRemoveTempFile(log, msgf, "message with generated dmarc aggregate report")
775
776 // We are sending reports from our host's postmaster address. In a
777 // typical setup the host is a subdomain of a configured domain with
778 // DKIM keys, so we can DKIM-sign our reports. SPF should pass anyway.
779 // A single report can contain deliveries from a single policy domain
780 // to multiple of our configured domains.
781 from := smtp.Address{Localpart: "postmaster", Domain: mox.Conf.Static.HostnameDomain}
782
783 // Subject follows the form in RFC. ../rfc/7489:1871
784 subject := fmt.Sprintf("Report Domain: %s Submitter: %s Report-ID: <%s>", dom.ASCII, mox.Conf.Static.HostnameDomain.ASCII, report.ReportMetadata.ReportID)
785
786 // Human-readable part for convenience. ../rfc/7489:1803
787 text := fmt.Sprintf(`Attached is an aggregate DMARC report with results of evaluations of the DMARC
788policy of your domain for messages received by us that have your domain in the
789message From header. You are receiving this message because your address is
790specified in the "rua" field of the DMARC record for your domain.
791
792Report domain: %s
793Submitter: %s
794Report-ID: %s
795Period: %s - %s UTC
796`, dom, mox.Conf.Static.HostnameDomain, report.ReportMetadata.ReportID, beginTime.UTC().Format(time.DateTime), endTime.UTC().Format(time.DateTime))
797
798 // The attached file follows the naming convention from the RFC. ../rfc/7489:1812
799 reportFilename := fmt.Sprintf("%s!%s!%d!%d.xml.gz", mox.Conf.Static.HostnameDomain.ASCII, dom.ASCII, beginTime.Unix(), endTime.Add(-time.Second).Unix())
800
801 var addrs []message.NameAddress
802 for _, rcpt := range recipients {
803 addrs = append(addrs, message.NameAddress{Address: rcpt.address})
804 }
805
806 // Compose the message.
807 msgPrefix, has8bit, smtputf8, messageID, err := composeAggregateReport(ctx, log, msgf, from, addrs, subject, text, reportFilename, reportFile)
808 if err != nil {
809 return false, fmt.Errorf("composing message with outgoing dmarc aggregate report: %v", err)
810 }
811
812 // Get size of message after all compression and encodings (base64 makes it big
813 // again), and go through potentials recipients (rua). If they are willing to
814 // accept the report, queue it.
815 msgInfo, err := msgf.Stat()
816 if err != nil {
817 return false, fmt.Errorf("stat message with outgoing dmarc aggregate report: %v", err)
818 }
819 msgSize := int64(len(msgPrefix)) + msgInfo.Size()
820 var queued bool
821 for _, rcpt := range recipients {
822 // If recipient is on suppression list, we won't queue the reporting message.
823 q := bstore.QueryDB[SuppressAddress](ctx, db)
824 q.FilterNonzero(SuppressAddress{ReportingAddress: rcpt.address.Path().String()})
825 q.FilterGreater("Until", time.Now())
826 exists, err := q.Exists()
827 if err != nil {
828 return false, fmt.Errorf("querying suppress list: %v", err)
829 }
830 if exists {
831 log.Info("suppressing outgoing dmarc aggregate report", mlog.Field("reportingaddress", rcpt.address))
832 continue
833 }
834
835 // Only send to addresses where we don't exceed their size limit. The RFC mentions
836 // the size of the report, but then continues about the size after compression and
837 // transport encodings (i.e. gzip and the mime base64 attachment, so the intention
838 // is probably to compare against the size of the message that contains the report.
839 // ../rfc/7489:1773
840 if rcpt.maxSize > 0 && msgSize > int64(rcpt.maxSize) {
841 continue
842 }
843
844 qm := queue.MakeMsg(mox.Conf.Static.Postmaster.Account, from.Path(), rcpt.address.Path(), has8bit, smtputf8, msgSize, messageID, []byte(msgPrefix), nil)
845 // Don't try as long as regular deliveries, and stop before we would send the
846 // delayed DSN. Though we also won't send that due to IsDMARCReport.
847 qm.MaxAttempts = 5
848 qm.IsDMARCReport = true
849
850 err = queueAdd(ctx, log, &qm, msgf)
851 if err != nil {
852 tempError = true
853 log.Errorx("queueing message with dmarc aggregate report", err)
854 metricReportError.Inc()
855 } else {
856 log.Debug("dmarc aggregate report queued", mlog.Field("recipient", rcpt.address))
857 queued = true
858 metricReport.Inc()
859 }
860 }
861
862 if !queued {
863 if err := sendErrorReport(ctx, log, db, from, addrs, dom, report.ReportMetadata.ReportID, msgSize); err != nil {
864 log.Errorx("sending dmarc error reports", err)
865 metricReportError.Inc()
866 }
867 }
868
869 // Regardless of whether we queued a report, we are not going to keep the
870 // evaluations around. Though this can be overridden if tempError is set.
871 // ../rfc/7489:1785
872
873 return true, nil
874}
875
876func composeAggregateReport(ctx context.Context, log *mlog.Log, mf *os.File, fromAddr smtp.Address, recipients []message.NameAddress, subject, text, filename string, reportXMLGzipFile *os.File) (msgPrefix string, has8bit, smtputf8 bool, messageID string, rerr error) {
877 xc := message.NewComposer(mf, 100*1024*1024)
878 defer func() {
879 x := recover()
880 if x == nil {
881 return
882 }
883 if err, ok := x.(error); ok && errors.Is(err, message.ErrCompose) {
884 rerr = err
885 return
886 }
887 panic(x)
888 }()
889
890 // We only use smtputf8 if we have to, with a utf-8 localpart. For IDNA, we use ASCII domains.
891 for _, a := range recipients {
892 if a.Address.Localpart.IsInternational() {
893 xc.SMTPUTF8 = true
894 break
895 }
896 }
897
898 xc.HeaderAddrs("From", []message.NameAddress{{Address: fromAddr}})
899 xc.HeaderAddrs("To", recipients)
900 xc.Subject(subject)
901 messageID = fmt.Sprintf("<%s>", mox.MessageIDGen(xc.SMTPUTF8))
902 xc.Header("Message-Id", messageID)
903 xc.Header("Date", time.Now().Format(message.RFC5322Z))
904 xc.Header("User-Agent", "mox/"+moxvar.Version)
905 xc.Header("MIME-Version", "1.0")
906
907 // Multipart message, with a text/plain and the report attached.
908 mp := multipart.NewWriter(xc)
909 xc.Header("Content-Type", fmt.Sprintf(`multipart/mixed; boundary="%s"`, mp.Boundary()))
910 xc.Line()
911
912 // Textual part, just mentioning this is a DMARC report.
913 textBody, ct, cte := xc.TextPart(text)
914 textHdr := textproto.MIMEHeader{}
915 textHdr.Set("Content-Type", ct)
916 textHdr.Set("Content-Transfer-Encoding", cte)
917 textp, err := mp.CreatePart(textHdr)
918 xc.Checkf(err, "adding text part to message")
919 _, err = textp.Write(textBody)
920 xc.Checkf(err, "writing text part")
921
922 // DMARC report as attachment.
923 ahdr := textproto.MIMEHeader{}
924 ahdr.Set("Content-Type", "application/gzip")
925 ahdr.Set("Content-Transfer-Encoding", "base64")
926 cd := mime.FormatMediaType("attachment", map[string]string{"filename": filename})
927 ahdr.Set("Content-Disposition", cd)
928 ap, err := mp.CreatePart(ahdr)
929 xc.Checkf(err, "adding dmarc aggregate report to message")
930 wc := moxio.Base64Writer(ap)
931 _, err = io.Copy(wc, &moxio.AtReader{R: reportXMLGzipFile})
932 xc.Checkf(err, "adding attachment")
933 err = wc.Close()
934 xc.Checkf(err, "flushing attachment")
935
936 err = mp.Close()
937 xc.Checkf(err, "closing multipart")
938
939 xc.Flush()
940
941 msgPrefix = dkimSign(ctx, log, fromAddr, xc.SMTPUTF8, mf)
942
943 return msgPrefix, xc.Has8bit, xc.SMTPUTF8, messageID, nil
944}
945
946// Though this functionality is quite underspecified, we'll do our best to send our
947// an error report in case our report is too large for all recipients.
948// ../rfc/7489:1918
949func sendErrorReport(ctx context.Context, log *mlog.Log, db *bstore.DB, fromAddr smtp.Address, recipients []message.NameAddress, reportDomain dns.Domain, reportID string, reportMsgSize int64) error {
950 log.Debug("no reporting addresses willing to accept report given size, queuing short error message")
951
952 msgf, err := store.CreateMessageTemp("dmarcreportmsg-out")
953 if err != nil {
954 return fmt.Errorf("creating temporary message file for outgoing dmarc error report: %v", err)
955 }
956 defer store.CloseRemoveTempFile(log, msgf, "outgoing dmarc error report message")
957
958 var recipientStrs []string
959 for _, rcpt := range recipients {
960 recipientStrs = append(recipientStrs, rcpt.Address.String())
961 }
962
963 subject := fmt.Sprintf("DMARC aggregate reporting error report for %s", reportDomain.ASCII)
964 // ../rfc/7489:1926
965 text := fmt.Sprintf(`Report-Date: %s
966Report-Domain: %s
967Report-ID: %s
968Report-Size: %d
969Submitter: %s
970Submitting-URI: %s
971`, time.Now().Format(message.RFC5322Z), reportDomain.ASCII, reportID, reportMsgSize, mox.Conf.Static.HostnameDomain.ASCII, strings.Join(recipientStrs, ","))
972 text = strings.ReplaceAll(text, "\n", "\r\n")
973
974 msgPrefix, has8bit, smtputf8, messageID, err := composeErrorReport(ctx, log, msgf, fromAddr, recipients, subject, text)
975 if err != nil {
976 return err
977 }
978
979 msgInfo, err := msgf.Stat()
980 if err != nil {
981 return fmt.Errorf("stat message with outgoing dmarc error report: %v", err)
982 }
983 msgSize := int64(len(msgPrefix)) + msgInfo.Size()
984
985 for _, rcpt := range recipients {
986 // If recipient is on suppression list, we won't queue the reporting message.
987 q := bstore.QueryDB[SuppressAddress](ctx, db)
988 q.FilterNonzero(SuppressAddress{ReportingAddress: rcpt.Address.Path().String()})
989 q.FilterGreater("Until", time.Now())
990 exists, err := q.Exists()
991 if err != nil {
992 return fmt.Errorf("querying suppress list: %v", err)
993 }
994 if exists {
995 log.Info("suppressing outgoing dmarc error report", mlog.Field("reportingaddress", rcpt.Address))
996 continue
997 }
998
999 qm := queue.MakeMsg(mox.Conf.Static.Postmaster.Account, fromAddr.Path(), rcpt.Address.Path(), has8bit, smtputf8, msgSize, messageID, []byte(msgPrefix), nil)
1000 // Don't try as long as regular deliveries, and stop before we would send the
1001 // delayed DSN. Though we also won't send that due to IsDMARCReport.
1002 qm.MaxAttempts = 5
1003 qm.IsDMARCReport = true
1004
1005 if err := queueAdd(ctx, log, &qm, msgf); err != nil {
1006 log.Errorx("queueing message with dmarc error report", err)
1007 metricReportError.Inc()
1008 } else {
1009 log.Debug("dmarc error report queued", mlog.Field("recipient", rcpt))
1010 metricReport.Inc()
1011 }
1012 }
1013 return nil
1014}
1015
1016func composeErrorReport(ctx context.Context, log *mlog.Log, mf *os.File, fromAddr smtp.Address, recipients []message.NameAddress, subject, text string) (msgPrefix string, has8bit, smtputf8 bool, messageID string, rerr error) {
1017 xc := message.NewComposer(mf, 100*1024*1024)
1018 defer func() {
1019 x := recover()
1020 if x == nil {
1021 return
1022 }
1023 if err, ok := x.(error); ok && errors.Is(err, message.ErrCompose) {
1024 rerr = err
1025 return
1026 }
1027 panic(x)
1028 }()
1029
1030 // We only use smtputf8 if we have to, with a utf-8 localpart. For IDNA, we use ASCII domains.
1031 for _, a := range recipients {
1032 if a.Address.Localpart.IsInternational() {
1033 xc.SMTPUTF8 = true
1034 break
1035 }
1036 }
1037
1038 xc.HeaderAddrs("From", []message.NameAddress{{Address: fromAddr}})
1039 xc.HeaderAddrs("To", recipients)
1040 xc.Header("Subject", subject)
1041 messageID = fmt.Sprintf("<%s>", mox.MessageIDGen(xc.SMTPUTF8))
1042 xc.Header("Message-Id", messageID)
1043 xc.Header("Date", time.Now().Format(message.RFC5322Z))
1044 xc.Header("User-Agent", "mox/"+moxvar.Version)
1045 xc.Header("MIME-Version", "1.0")
1046
1047 textBody, ct, cte := xc.TextPart(text)
1048 xc.Header("Content-Type", ct)
1049 xc.Header("Content-Transfer-Encoding", cte)
1050 xc.Line()
1051 _, err := xc.Write(textBody)
1052 xc.Checkf(err, "writing text")
1053
1054 xc.Flush()
1055
1056 msgPrefix = dkimSign(ctx, log, fromAddr, smtputf8, mf)
1057
1058 return msgPrefix, xc.Has8bit, xc.SMTPUTF8, messageID, nil
1059}
1060
1061func dkimSign(ctx context.Context, log *mlog.Log, fromAddr smtp.Address, smtputf8 bool, mf *os.File) string {
1062 // Add DKIM-Signature headers if we have a key for (a higher) domain than the from
1063 // address, which is a host name. A signature will only be useful with higher-level
1064 // domains if they have a relaxed dkim check (which is the default). If the dkim
1065 // check is strict, there is no harm, there will simply not be a dkim pass.
1066 fd := fromAddr.Domain
1067 var zerodom dns.Domain
1068 for fd != zerodom {
1069 confDom, ok := mox.Conf.Domain(fd)
1070 if len(confDom.DKIM.Sign) > 0 {
1071 dkimHeaders, err := dkim.Sign(ctx, fromAddr.Localpart, fd, confDom.DKIM, smtputf8, mf)
1072 if err != nil {
1073 log.Errorx("dkim-signing dmarc report, continuing without signature", err)
1074 metricReportError.Inc()
1075 return ""
1076 }
1077 return dkimHeaders
1078 } else if ok {
1079 return ""
1080 }
1081
1082 var nfd dns.Domain
1083 _, nfd.ASCII, _ = strings.Cut(fd.ASCII, ".")
1084 _, nfd.Unicode, _ = strings.Cut(fd.Unicode, ".")
1085 fd = nfd
1086 }
1087 return ""
1088}
1089
1090// SuppressAdd adds an address to the suppress list.
1091func SuppressAdd(ctx context.Context, ba *SuppressAddress) error {
1092 db, err := evalDB(ctx)
1093 if err != nil {
1094 return err
1095 }
1096
1097 return db.Insert(ctx, ba)
1098}
1099
1100// SuppressList returns all reporting addresses on the suppress list.
1101func SuppressList(ctx context.Context) ([]SuppressAddress, error) {
1102 db, err := evalDB(ctx)
1103 if err != nil {
1104 return nil, err
1105 }
1106
1107 return bstore.QueryDB[SuppressAddress](ctx, db).SortDesc("ID").List()
1108}
1109
1110// SuppressRemove removes a reporting address record from the suppress list.
1111func SuppressRemove(ctx context.Context, id int64) error {
1112 db, err := evalDB(ctx)
1113 if err != nil {
1114 return err
1115 }
1116
1117 return db.Delete(ctx, &SuppressAddress{ID: id})
1118}
1119
1120// SuppressUpdate updates the until field of a reporting address record.
1121func SuppressUpdate(ctx context.Context, id int64, until time.Time) error {
1122 db, err := evalDB(ctx)
1123 if err != nil {
1124 return err
1125 }
1126
1127 ba := SuppressAddress{ID: id}
1128 err = db.Get(ctx, &ba)
1129 if err != nil {
1130 return err
1131 }
1132 ba.Until = until
1133 return db.Update(ctx, &ba)
1134}
1135