diff --git a/Lib/imaplib.py b/Lib/imaplib.py index fa4c0f8f62361a1..cbeaf381e99f0c7 100644 --- a/Lib/imaplib.py +++ b/Lib/imaplib.py @@ -23,8 +23,10 @@ __version__ = "2.58" import binascii, errno, random, re, socket, subprocess, sys, time, calendar +from collections import namedtuple from datetime import datetime, timezone, timedelta from io import DEFAULT_BUFFER_SIZE +from typing import Tuple try: import ssl @@ -131,6 +133,10 @@ _Untagged_status = br'\* (?P\d+) (?P[A-Z-]+)( (?P.*))?' +# Store capabilities present in the current server +# Add additional ones as needed +AvailableCapabilities = namedtuple('Capas', ['MOVE', 'UIDPLUS']) + class IMAP4: @@ -194,6 +200,7 @@ def __init__(self, host='', port=IMAP4_PORT, timeout=None): self.continuation_response = '' # Last continuation response self.is_readonly = False # READ-ONLY desired state self.tagnum = 0 + self._features_available = AvailableCapabilities(False, False) self._tls_established = False self._mode_ascii() @@ -611,6 +618,9 @@ def login(self, user, password): if typ != 'OK': raise self.error(dat[-1]) self.state = 'AUTH' + + self._get_capabilities() + return typ, dat @@ -655,6 +665,44 @@ def lsub(self, directory='""', pattern='*'): typ, dat = self._simple_command(name, directory, pattern) return self._untagged_response(typ, dat, name) + def move_messages(self, target: str, msgs: str) -> Tuple[str, str]: + """ + Higher level command to move message inside of one account. + Following RFC-6851. + + :param str target: name of the folder to move messages into + :param str messages: UID sequence set (according to + https://tools.ietf.org/html/rfc3501#section-9) + :return: type, data tuple; type is 'OK' or something else in + case of failure. + :rtype: tuple [str, str] + """ + if self._features_available.MOVE: + typ, dat = self._simple_command('UID', 'MOVE', + msgs, target) + return self._untagged_response(typ, dat, 'MOVE') + elif self._features_available.UIDPLUS: + ok, data = self.uid('COPY', f'{msgs} {target}') + if ok != 'OK': + return ok, f'Cannot copy messages {msgs} to folder {target}' + ok, data = self.uid('STORE', + f'+FLAGS.SILENT (\\DELETED) {msgs}') + if ok != 'OK': + return ok, f'Cannot delete messages {msgs}.' + ok, data = self.uid('EXPUNGE', msgs) + if ok != 'OK': + return ok, f'Cannot expunge messages {msgs}.' + return ok, f'Messages {msgs} moved to {target}.' + else: + ok, data = self.uid('COPY', f'{msgs} {target}') + if ok != 'OK': + return ok, f'Cannot copy messages {msgs} to folder {target}' + ok, data = self.uid('STORE', + f'+FLAGS.SILENT (\\DELETED) {msgs}') + if ok != 'OK': + return ok, f'Cannot delete messages {msgs}.' + return ok, f'Messages {msgs} moved to {target}.' + def myrights(self, mailbox): """Show my ACLs for a mailbox (i.e. the rights that I have on mailbox). @@ -1058,11 +1106,15 @@ def _command_complete(self, name, tag): def _get_capabilities(self): typ, dat = self.capability() + if typ != 'OK': + raise self.error(dat[-1]) if dat == [None]: raise self.error('no CAPABILITY response from server') dat = str(dat[-1], self._encoding) dat = dat.upper() self.capabilities = tuple(dat.split()) + self._features_available = AvailableCapabilities._make( + ['MOVE' in dat, 'UIDPLUS' in dat]) def _get_response(self): diff --git a/Lib/test/test_imaplib.py b/Lib/test/test_imaplib.py index c2b935f58164e5a..b2b40893ba5c148 100644 --- a/Lib/test/test_imaplib.py +++ b/Lib/test/test_imaplib.py @@ -538,6 +538,59 @@ def test_unselect(self): self.assertEqual(data[0], b'Returned to authenticated state. (Success)') self.assertEqual(client.state, 'AUTH') + def test_move_messages(self): + from email.message import EmailMessage + + class MoveServer(SimpleIMAPHandler): + capabilities = 'ENABLE UTF8=ACCEPT' + + def cmd_ENABLE(self, tag, args): + self._send_tagged(tag, 'OK', 'ENABLE successful') + + def cmd_AUTHENTICATE(self, tag, args): + self._send_textline('+') + self.server.response = yield + self._send_tagged(tag, 'OK', 'FAKEAUTH successful') + + def cmd_APPEND(self, tag, args): + self._send_textline('+') + self.server.response = yield + self._send_tagged(tag, 'OK', 'okay') + + def cmd_CREATE(self, tag, args): + self._send_textline('* CREATE ' + args[0]) + self._send_tagged(tag, 'OK', 'okay') + + def cmd_MOVE(self, tag, args): + self._send_textline('* MOVE ' + args[0]) + self._send_tagged(tag, 'OK', 'okay') + + def cmd_SELECT(self, tag, args): + self.server.is_selected = True + self._send_textline('* 2 EXISTS') + self._send_tagged(tag, 'OK', '[READ-WRITE] SELECT completed.') + + client, _ = self._setup(MoveServer) + typ, data = client.login('user', 'pass') + typ, data = client.create('source') + typ, data = client.create('target') + typ, data = client.select('source') + + msg = EmailMessage() + msg['Subject'] = 'Test message' + msg['From'] = 'test@example.com' + msg['To'] = 'testee@example.com' + msg.set_content('This is a testing message') + print(f'=========================\nmsg:\n{msg}=============') + typ, data = client.append('source', None, None, msg.as_bytes()) + + typ, data = client.search(None, 'ALL') + typ, data = client.move_messages('target', data[0]) + self.assertEqual(typ, 'OK') + typ, data = client.search(None, 'target', 'ALL') + self.assertEqual(typ, 'OK') + self.assertEqual(int(data[0]), 1) + class NewIMAPTests(NewIMAPTestsMixin, unittest.TestCase): imap_class = imaplib.IMAP4 diff --git a/Misc/NEWS.d/next/Library/2020-03-19-11-39-00.bpo-33327.taAn2e.rst b/Misc/NEWS.d/next/Library/2020-03-19-11-39-00.bpo-33327.taAn2e.rst new file mode 100644 index 000000000000000..ecafca874d89719 --- /dev/null +++ b/Misc/NEWS.d/next/Library/2020-03-19-11-39-00.bpo-33327.taAn2e.rst @@ -0,0 +1,4 @@ +Provides method :meth:`.move_messages()` of IMAP4 object, which +moves messages among two IMAP folders. This change introduces and +uses caching of the server’s capabilities in +:attr:`__features_available` internal attribute.