19	"golang.org/x/exp/maps"
 
21	"github.com/mjl-/mox/config"
 
22	"github.com/mjl-/mox/message"
 
23	"github.com/mjl-/mox/metrics"
 
24	"github.com/mjl-/mox/mox-"
 
25	"github.com/mjl-/mox/store"
 
28// todo: add option to trust imported messages, causing us to look at Authentication-Results and Received-SPF headers and add eg verified spf/dkim/dmarc domains to our store, to jumpstart reputation.
 
30const importCommonHelp = `The mbox/maildir archive is accessed and imported by the running mox process, so
 
31it must have access to the archive files. The default suggested systemd service
 
32file isolates mox from most of the file system, with only the "data/" directory
 
33accessible, so you may want to put the mbox/maildir archive files in a
 
34directory like "data/import/" to make it available to mox.
 
36By default, messages will train the junk filter based on their flags and, if
 
37"automatic junk flags" configuration is set, based on mailbox naming.
 
39If the destination mailbox is the Sent mailbox, the recipients of the messages
 
40are added to the message metadata, causing later incoming messages from these
 
41recipients to be accepted, unless other reputation signals prevent that.
 
43Users can also import mailboxes/messages through the account web page by
 
44uploading a zip or tgz file with mbox and/or maildirs.
 
46Messages are imported even if already present. Importing messages twice will
 
47result in duplicate messages.
 
50func cmdImportMaildir(c *cmd) {
 
51	c.params = "accountname mailboxname maildir"
 
52	c.help = `Import a maildir into an account.
 
54` + importCommonHelp + `
 
55Mailbox flags, like "seen", "answered", will be imported. An optional
 
56dovecot-keywords file can specify additional flags, like Forwarded/Junk/NotJunk.
 
63	ctlcmdImport(xctl(), false, args[0], args[1], args[2])
 
66func cmdImportMbox(c *cmd) {
 
67	c.params = "accountname mailboxname mbox"
 
68	c.help = `Import an mbox into an account.
 
70Using mbox is not recommended, maildir is a better defined format.
 
78	ctlcmdImport(xctl(), true, args[0], args[1], args[2])
 
81func cmdXImportMaildir(c *cmd) {
 
83	c.params = "accountdir mailboxname maildir"
 
84	c.help = `Import a maildir into an account by directly accessing the data directory.
 
87See "mox help import maildir" for details.
 
92func cmdXImportMbox(c *cmd) {
 
94	c.params = "accountdir mailboxname mbox"
 
95	c.help = `Import an mbox into an account by directly accessing the data directory.
 
97See "mox help import mbox" for details.
 
102func xcmdXImport(mbox bool, c *cmd) {
 
108	accountdir := args[0]
 
109	account := filepath.Base(accountdir)
 
111	// Set up the mox config so the account can be opened.
 
112	if filepath.Base(filepath.Dir(accountdir)) != "accounts" {
 
113		log.Fatalf("accountdir must be of the form .../accounts/<name>")
 
116	mox.Conf.Static.DataDir, err = filepath.Abs(filepath.Dir(filepath.Dir(accountdir)))
 
117	xcheckf(err, "making absolute datadir")
 
118	mox.ConfigStaticPath = "fake.conf"
 
119	mox.Conf.DynamicLastCheck = time.Now().Add(time.Hour) // Silence errors about config file.
 
120	mox.Conf.Dynamic.Accounts = map[string]config.Account{
 
123	defer store.Switchboard()()
 
125	cconn, sconn := net.Pipe()
 
126	clientctl := ctl{conn: cconn, r: bufio.NewReader(cconn), log: c.log}
 
127	serverctl := ctl{conn: sconn, r: bufio.NewReader(sconn), log: c.log}
 
128	go servectlcmd(context.Background(), &serverctl, func() {})
 
130	ctlcmdImport(&clientctl, mbox, account, args[1], args[2])
 
133func ctlcmdImport(ctl *ctl, mbox bool, account, mailbox, src string) {
 
135		ctl.xwrite("importmbox")
 
137		ctl.xwrite("importmaildir")
 
140	if strings.EqualFold(mailbox, "Inbox") {
 
146	fmt.Fprintln(os.Stderr, "importing...")
 
149		if strings.HasPrefix(line, "progress ") {
 
150			n := line[len("progress "):]
 
151			fmt.Fprintf(os.Stderr, "%s...\n", n)
 
155			log.Fatalf("import, expected ok, got %q", line)
 
160	fmt.Fprintf(os.Stderr, "%s imported\n", count)
 
163func importctl(ctx context.Context, ctl *ctl, mbox bool) {
 
165	> "importmaildir" or "importmbox"
 
168	> src (mbox file or maildir directory)
 
170	< "progress" count (zero or more times, once for every 1000 messages)
 
171	< "ok" when done, or error
 
172	< count (of total imported messages, only if not error)
 
174	account := ctl.xread()
 
175	mailbox := ctl.xread()
 
182	ctl.log.Info("importing messages",
 
183		slog.String("kind", kind),
 
184		slog.String("account", account),
 
185		slog.String("mailbox", mailbox),
 
186		slog.String("source", src))
 
190	var mdnewf, mdcurf *os.File
 
191	var msgreader store.MsgSource
 
193	// Open account, creating a database file if it doesn't exist yet. It must be known
 
194	// in the configuration file.
 
195	a, err := store.OpenAccount(ctl.log, account)
 
196	ctl.xcheck(err, "opening account")
 
200			ctl.log.Check(err, "closing account after import")
 
204	err = a.ThreadingWait(ctl.log)
 
205	ctl.xcheck(err, "waiting for account thread upgrade")
 
210			ctl.log.Check(err, "closing mbox file after import")
 
213			err := mdnewf.Close()
 
214			ctl.log.Check(err, "closing maildir new after import")
 
217			err := mdcurf.Close()
 
218			ctl.log.Check(err, "closing maildir cur after import")
 
222	// Messages don't always have a junk flag set. We'll assume anything in a mailbox
 
223	// starting with junk or spam is junk mail.
 
225	// First check if we can access the mbox/maildir.
 
226	// Mox needs to be able to access those files, the user running the import command
 
227	// may be a different user who can access the files.
 
229		mboxf, err = os.Open(src)
 
230		ctl.xcheck(err, "open mbox file")
 
231		msgreader = store.NewMboxReader(ctl.log, store.CreateMessageTemp, src, mboxf)
 
233		mdnewf, err = os.Open(filepath.Join(src, "new"))
 
234		ctl.xcheck(err, "open subdir new of maildir")
 
235		mdcurf, err = os.Open(filepath.Join(src, "cur"))
 
236		ctl.xcheck(err, "open subdir cur of maildir")
 
237		msgreader = store.NewMaildirReader(ctl.log, store.CreateMessageTemp, mdnewf, mdcurf)
 
240	tx, err := a.DB.Begin(ctx, true)
 
241	ctl.xcheck(err, "begin transaction")
 
245			ctl.log.Check(err, "rolling back transaction")
 
249	// All preparations done. Good to go.
 
252	// We will be delivering messages. If we fail halfway, we need to remove the created msg files.
 
253	var deliveredIDs []int64
 
262			ctl.log.Error("import error", slog.String("panic", fmt.Sprintf("%v", x)))
 
264			metrics.PanicInc(metrics.Import)
 
266			ctl.log.Error("import error")
 
269		for _, id := range deliveredIDs {
 
270			p := a.MessagePath(id)
 
272			ctl.log.Check(err, "closing message file after import error", slog.String("path", p))
 
275		ctl.xerror(fmt.Sprintf("import error: %v", x))
 
278	var changes []store.Change
 
280	var modseq store.ModSeq // Assigned on first delivered messages, used for all messages.
 
282	xdeliver := func(m *store.Message, mf *os.File) {
 
283		// todo: possibly set dmarcdomain to the domain of the from address? at least for non-spams that have been seen. otherwise user would start without any reputations. the assumption would be that the user has accepted email and deemed it legit, coming from the indicated sender.
 
287		const nothreads = true
 
288		const updateDiskUsage = false
 
289		err := a.DeliverMessage(ctl.log, tx, m, mf, sync, notrain, nothreads, updateDiskUsage)
 
290		ctl.xcheck(err, "delivering message")
 
291		deliveredIDs = append(deliveredIDs, m.ID)
 
292		ctl.log.Debug("delivered message", slog.Int64("id", m.ID))
 
293		changes = append(changes, m.ChangeAddUID())
 
296	// todo: one goroutine for reading messages, one for parsing the message, one adding to database, one for junk filter training.
 
299		// Ensure mailbox exists.
 
301		mb, changes, err = a.MailboxEnsure(tx, mailbox, true)
 
302		ctl.xcheck(err, "ensuring mailbox exists")
 
304		// We ensure keywords in messages make it to the mailbox as well.
 
305		mailboxKeywords := map[string]bool{}
 
307		jf, _, err := a.OpenJunkFilter(ctx, ctl.log)
 
308		if err != nil && !errors.Is(err, store.ErrNoJunkFilter) {
 
309			ctl.xcheck(err, "open junk filter")
 
314				ctl.xcheck(err, "close junk filter")
 
320		maxSize := a.QuotaMessageSize()
 
322		du := store.DiskUsage{ID: 1}
 
324		ctl.xcheck(err, "get disk usage")
 
326		process := func(m *store.Message, msgf *os.File, origPath string) {
 
327			defer store.CloseRemoveTempFile(ctl.log, msgf, "message to import")
 
330			if maxSize > 0 && du.MessageSize+addSize > maxSize {
 
331				ctl.xcheck(fmt.Errorf("account over maximum total message size %d", maxSize), "checking quota")
 
334			for _, kw := range m.Keywords {
 
335				mailboxKeywords[kw] = true
 
337			mb.Add(m.MailboxCounts())
 
339			// Parse message and store parsed information for later fast retrieval.
 
340			p, err := message.EnsurePart(ctl.log.Logger, false, msgf, m.Size)
 
342				ctl.log.Infox("parsing message, continuing", err, slog.String("path", origPath))
 
344			m.ParsedBuf, err = json.Marshal(p)
 
345			ctl.xcheck(err, "marshal parsed message structure")
 
347			// Set fields needed for future threading. By doing it now, DeliverMessage won't
 
348			// have to parse the Part again.
 
349			p.SetReaderAt(store.FileMsgReader(m.MsgPrefix, msgf))
 
350			m.PrepareThreading(ctl.log, &p)
 
352			if m.Received.IsZero() {
 
353				if p.Envelope != nil && !p.Envelope.Date.IsZero() {
 
354					m.Received = p.Envelope.Date
 
356					m.Received = time.Now()
 
360			// We set the flags that Deliver would set now and train ourselves. This prevents
 
361			// Deliver from training, which would open the junk filter, change it, and write it
 
362			// back to disk, for each message (slow).
 
363			m.JunkFlagsForMailbox(mb, conf)
 
364			if jf != nil && m.NeedsTraining() {
 
365				if words, err := jf.ParseMessage(p); err != nil {
 
366					ctl.log.Infox("parsing message for updating junk filter", err, slog.String("parse", ""), slog.String("path", origPath))
 
368					err = jf.Train(ctx, !m.Junk, words)
 
369					ctl.xcheck(err, "training junk filter")
 
370					m.TrainedJunk = &m.Junk
 
376				modseq, err = a.NextModSeq(tx)
 
377				ctl.xcheck(err, "assigning next modseq")
 
381			m.MailboxOrigID = mb.ID
 
388				ctl.xwrite(fmt.Sprintf("progress %d", n))
 
393			m, msgf, origPath, err := msgreader.Next()
 
397			ctl.xcheck(err, "reading next message")
 
399			process(m, msgf, origPath)
 
403		if len(deliveredIDs) > 0 {
 
404			err = a.AssignThreads(ctx, ctl.log, tx, deliveredIDs[0], 0, io.Discard)
 
405			ctl.xcheck(err, "assigning messages to threads")
 
408		// Get mailbox again, uidnext is likely updated.
 
409		mc := mb.MailboxCounts
 
411		ctl.xcheck(err, "get mailbox")
 
412		mb.MailboxCounts = mc
 
414		// If there are any new keywords, update the mailbox.
 
416		mb.Keywords, mbKwChanged = store.MergeKeywords(mb.Keywords, maps.Keys(mailboxKeywords))
 
418			changes = append(changes, mb.ChangeKeywords())
 
422		ctl.xcheck(err, "updating message counts and keywords in mailbox")
 
423		changes = append(changes, mb.ChangeCounts())
 
425		err = a.AddMessageSize(ctl.log, tx, addSize)
 
426		xcheckf(err, "updating total message size")
 
429		ctl.xcheck(err, "commit")
 
431		ctl.log.Info("delivered messages through import", slog.Int("count", len(deliveredIDs)))
 
434		store.BroadcastChanges(a, changes)
 
438	ctl.xcheck(err, "closing account")
 
442	ctl.xwrite(fmt.Sprintf("%d", n))