From 78cf5e8d8ee157183266801eb3e2a2999b307982 Mon Sep 17 00:00:00 2001 From: Serhiy Storchaka Date: Thu, 2 Jul 2026 13:14:44 +0300 Subject: [PATCH] gh-66335: Add tests for imaplib command methods (GH-152872) Add coverage tests for the IMAP4 command methods and their UID variants (SELECT, CREATE, COPY, STORE, FETCH, SEARCH, SORT, THREAD, DELETE, RENAME, SUBSCRIBE, UNSUBSCRIBE, LIST, STATUS, the ACL and quota commands, ANNOTATION, PROXYAUTH and ENABLE), plus test helpers (splitargs, parse_sequence_set, make_simple_handler) and per-command argument capture in the test server. (cherry picked from commit a50b089b7c46e3068d9177b5f18ac767a31a18cd) Co-authored-by: Serhiy Storchaka Co-authored-by: Claude Opus 4.8 (1M context) --- Lib/test/test_imaplib.py | 672 ++++++++++++++++++++++++++++++++++++++- 1 file changed, 660 insertions(+), 12 deletions(-) diff --git a/Lib/test/test_imaplib.py b/Lib/test/test_imaplib.py index 67bf47ad41a1ace..e797a625014603e 100644 --- a/Lib/test/test_imaplib.py +++ b/Lib/test/test_imaplib.py @@ -26,6 +26,68 @@ CERTFILE = os.path.join(os.path.dirname(__file__) or os.curdir, "certdata", "keycert3.pem") CAFILE = os.path.join(os.path.dirname(__file__) or os.curdir, "certdata", "pycacert.pem") +_quoted_string = re.compile(r'"(?:[^"\\]|\\.)*"') + + +def splitargs(line): + # Split a command line into IMAP arguments: quoted strings, balanced + # (possibly nested) parenthesized lists, and bare atoms. + args = [] + i, n = 0, len(line) + while i < n: + if line[i] == ' ': + i += 1 + continue + start = i + depth = 0 + while i < n and (depth or line[i] != ' '): + c = line[i] + if c == '"': + i = _quoted_string.match(line, i).end() + continue + elif c == '(': + depth += 1 + elif c == ')': + depth -= 1 + i += 1 + args.append(line[start:i]) + return args + + +def parse_seq_number(s, maxmsg): + if s == '*': + return maxmsg + return int(s) + + +def parse_sequence_set(arg, maxmsg): + for s in arg.split(','): + if ':' in s: + lo, hi = s.split(':') + lo = parse_seq_number(lo, maxmsg) + hi = parse_seq_number(hi, maxmsg) + yield from range(min(lo, hi), max(lo, hi) + 1) + else: + yield parse_seq_number(s, maxmsg) + + +def make_simple_handler(command, untagged_response=(), + completed=None): + if completed is None: + completed = f'{command} completed' + def cmd(self, tag, args): + for msg in untagged_response: + self._send_textline(msg) + self._send_tagged(tag, 'OK', completed) + cmd.__name__ = 'cmd_' + command + class Handler(SimpleIMAPHandler): + pass + Handler.__name__ = command.title() + 'Handler' + setattr(Handler, cmd.__name__, cmd) + Handler.__qualname__ = Handler.__name__ + cmd.__qualname__ = Handler.__qualname__ + '.' + cmd.__name__ + return Handler + class TestImaplib(unittest.TestCase): @@ -119,7 +181,7 @@ class SimpleIMAPHandler(socketserver.StreamRequestHandler): def setup(self): super().setup() - self.server.is_selected = False + self.server.is_selected = None self.server.logged = None def _send(self, message): @@ -167,10 +229,11 @@ def handle(self): except StopIteration: self.continuation = None continue - splitline = line.decode('ASCII').split() + splitline = splitargs(line.decode().removesuffix('\r\n')) tag = splitline[0] cmd = splitline[1] args = splitline[2:] + self.server.args = args if hasattr(self, 'cmd_' + cmd): continuation = getattr(self, 'cmd_' + cmd)(tag, args) @@ -197,13 +260,18 @@ def cmd_LOGIN(self, tag, args): self._send_tagged(tag, 'OK', 'LOGIN completed') def cmd_SELECT(self, tag, args): - self.server.is_selected = True + self.server.is_selected = args self._send_line(b'* 2 EXISTS') self._send_tagged(tag, 'OK', '[READ-WRITE] SELECT completed.') + def cmd_EXAMINE(self, tag, args): + self.server.is_selected = args + self._send_line(b'* 2 EXISTS') + self._send_tagged(tag, 'OK', '[READ-ONLY] EXAMINE completed.') + def cmd_UNSELECT(self, tag, args): - if self.server.is_selected: - self.server.is_selected = False + if self.server.is_selected is not None: + self.server.is_selected = None self._send_tagged(tag, 'OK', 'Returned to authenticated state. (Success)') else: self._send_tagged(tag, 'BAD', 'No mailbox selected') @@ -745,18 +813,20 @@ def test_logout(self): self.assertEqual(client.state, 'LOGOUT') def test_lsub(self): - class LsubCmd(SimpleIMAPHandler): - def cmd_LSUB(self, tag, args): - self._send_textline('* LSUB () "." directoryA') - return self._send_tagged(tag, 'OK', 'LSUB completed') - client, _ = self._setup(LsubCmd) + client, server = self._setup(make_simple_handler('LSUB', + ['* LSUB () "." directoryA', '* LSUB () "." directoryB'])) client.login('user', 'pass') typ, data = client.lsub() self.assertEqual(typ, 'OK') - self.assertEqual(data[0], b'() "." directoryA') + self.assertEqual(data, [b'() "." directoryA', b'() "." directoryB']) + self.assertEqual(server.args, ['""', '*']) + + typ, data = client.lsub('~/Mail/', '%') + self.assertEqual(typ, 'OK') + self.assertEqual(server.args, ['~/Mail/', '%']) def test_unselect(self): - client, _ = self._setup(SimpleIMAPHandler) + client, server = self._setup(SimpleIMAPHandler) client.login('user', 'pass') typ, data = client.select() self.assertEqual(typ, 'OK') @@ -766,6 +836,584 @@ def test_unselect(self): self.assertEqual(typ, 'OK') self.assertEqual(data[0], b'Returned to authenticated state. (Success)') self.assertEqual(client.state, 'AUTH') + self.assertIsNone(server.is_selected) + + def test_enable(self): + class EnableHandler(SimpleIMAPHandler): + capabilities = 'IMAP4rev1 ID LITERAL+ ENABLE X-GOOD-IDEA' + def cmd_ENABLE(self, tag, args): + capabilities = self.capabilities.split() + for arg in args: + if arg in capabilities: + self._send_textline('* ENABLED ' + arg) + self._send_tagged(tag, 'OK', 'foo') + + client, server = self._setup(EnableHandler) + client.login('user', 'pass') + code, data = client.enable('CONDSTORE X-GOOD-IDEA') + self.assertEqual(code, 'OK') + self.assertEqual(data, [b'foo']) + self.assertEqual(server.args, ['CONDSTORE', 'X-GOOD-IDEA']) + + def test_select(self): + client, server = self._setup(SimpleIMAPHandler) + client.login('user', 'pass') + typ, data = client.select() + self.assertEqual(typ, 'OK') + self.assertEqual(data[0], b'2') + self.assertEqual(server.is_selected, ['INBOX']) + + typ, data = client.select('Archive') + self.assertEqual(typ, 'OK') + self.assertEqual(server.is_selected, ['Archive']) + + # readonly=True issues EXAMINE instead of SELECT + # (there is no separate examine() method). + typ, data = client.select(readonly=True) + self.assertEqual(typ, 'OK') + self.assertEqual(data[0], b'2') + self.assertEqual(server.is_selected, ['INBOX']) + self.assertTrue(client.is_readonly) + + def test_expunge(self): + client, server = self._setup(make_simple_handler('EXPUNGE', + ['* 3 EXPUNGE', '* 3 EXPUNGE', '* 5 EXPUNGE', '* 8 EXPUNGE'])) + client.login('user', 'pass') + client.select() + typ, data = client.expunge() + self.assertEqual(typ, 'OK') + self.assertEqual(data, [b'3', b'3', b'5', b'8']) + + def test_close(self): + client, server = self._setup(make_simple_handler('CLOSE')) + client.login('user', 'pass') + client.select() + typ, data = client.close() + self.assertEqual(typ, 'OK') + self.assertEqual(client.state, 'AUTH') + + def test_check(self): + client, server = self._setup(make_simple_handler('CHECK')) + client.login('user', 'pass') + client.select() + typ, data = client.check() + self.assertEqual(typ, 'OK') + + def test_noop(self): + client, server = self._setup(make_simple_handler('NOOP', + ['* 4 EXISTS'])) + client.login('user', 'pass') + typ, data = client.noop() + self.assertEqual(typ, 'OK') + # NOOP is used to pick up server-pushed untagged responses. + self.assertEqual(client.untagged_responses['EXISTS'], [b'4']) + + def test_namespace(self): + client, server = self._setup(make_simple_handler('NAMESPACE', + ['* NAMESPACE (("" "/")) NIL NIL'])) + client.login('user', 'pass') + typ, data = client.namespace() + self.assertEqual(typ, 'OK') + self.assertEqual(data[0], b'(("" "/")) NIL NIL') + + def test_capability(self): + client, server = self._setup(make_simple_handler('CAPABILITY', + ['* CAPABILITY IMAP4rev1 IDLE NAMESPACE'])) + client.login('user', 'pass') + typ, data = client.capability() + self.assertEqual(typ, 'OK') + self.assertEqual(data, [b'IMAP4rev1 IDLE NAMESPACE']) + + def test_recent(self): + # recent() prods the server with NOOP if no RECENT was seen yet. + client, server = self._setup(make_simple_handler('NOOP', + ['* 5 RECENT'])) + client.login('user', 'pass') + typ, data = client.recent() + self.assertEqual(typ, 'OK') + self.assertEqual(data, [b'5']) + + def test_response(self): + client, server = self._setup(SimpleIMAPHandler) + client.login('user', 'pass') + client.select() # the handler answers with '* 2 EXISTS' + typ, data = client.response('EXISTS') + self.assertEqual(typ, 'EXISTS') + self.assertEqual(data, [b'2']) + # The value is cleared once read. + typ, data = client.response('EXISTS') + self.assertEqual(data, [None]) + + def test_create(self): + client, server = self._setup(make_simple_handler('CREATE')) + client.login('user', 'pass') + typ, data = client.create('owatagusiam/') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [b'CREATE completed']) + self.assertEqual(server.args, ['owatagusiam/']) + + typ, data = client.create('owatagusiam/blurdybloop') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [b'CREATE completed']) + self.assertEqual(server.args, ['owatagusiam/blurdybloop']) + + def test_copy(self): + client, server = self._setup(make_simple_handler('COPY')) + client.login('user', 'pass') + client.select() + typ, data = client.copy('2:4', 'MEETING') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [b'COPY completed']) + self.assertEqual(server.args, ['2:4', 'MEETING']) + + def test_uid_copy(self): + client, server = self._setup(make_simple_handler('UID', + completed='UID COPY completed')) + client.login('user', 'pass') + client.select() + typ, data = client.uid('copy', '4827313:4828442', 'MEETING') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [None]) + self.assertEqual(server.args, ['COPY', '4827313:4828442', 'MEETING']) + + def test_store(self): + client, server = self._setup(make_simple_handler('STORE', [ + r'* 2 FETCH (FLAGS (\Deleted \Seen))', + r'* 3 FETCH (FLAGS (\Deleted))', + r'* 4 FETCH (FLAGS (\Deleted \Flagged \Seen))', + ])) + client.login('user', 'pass') + client.select() + typ, data = client.store('2:4', '+FLAGS', r'(\Deleted)') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [ + br'2 (FLAGS (\Deleted \Seen))', + br'3 (FLAGS (\Deleted))', + br'4 (FLAGS (\Deleted \Flagged \Seen))', + ]) + self.assertEqual(server.args, ['2:4', '+FLAGS', r'(\Deleted)']) + + typ, data = client.store('2:4', '+FLAGS', r'\Deleted') + self.assertEqual(typ, 'OK') + self.assertEqual(server.args, ['2:4', '+FLAGS', r'(\Deleted)']) + + def test_uid_store(self): + client, server = self._setup(make_simple_handler('UID', [ + r'* 23 FETCH (FLAGS (\Deleted \Seen) UID 4827313)', + r'* 24 FETCH (FLAGS (\Deleted) UID 4827943)', + r'* 25 FETCH (FLAGS (\Deleted \Flagged \Seen) UID 4828442)', + ], 'UID STORE completed')) + client.login('user', 'pass') + client.select() + typ, data = client.uid('store', '4827313:4828442', '+FLAGS', r'(\Deleted)') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [ + br'23 (FLAGS (\Deleted \Seen) UID 4827313)', + br'24 (FLAGS (\Deleted) UID 4827943)', + br'25 (FLAGS (\Deleted \Flagged \Seen) UID 4828442)', + ]) + self.assertEqual(server.args, ['STORE', '4827313:4828442', '+FLAGS', r'(\Deleted)']) + + def test_fetch(self): + # The handler expands the requested sequence set and answers for + # exactly those messages, so the test exercises the round trip of + # the message set, not just a canned reply. + class FetchHandler(SimpleIMAPHandler): + messages = 4 + def cmd_SELECT(self, tag, args): + self.server.is_selected = args + self._send_line(b'* %d EXISTS' % self.messages) + self._send_tagged(tag, 'OK', '[READ-WRITE] SELECT completed.') + def cmd_FETCH(self, tag, args): + for n in parse_sequence_set(args[0], self.messages): + self._send_textline(r'* %d FETCH (FLAGS (\Seen))' % n) + self._send_tagged(tag, 'OK', 'FETCH completed') + + client, server = self._setup(FetchHandler) + client.login('user', 'pass') + client.select() + typ, data = client.fetch('2:4', '(FLAGS)') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [ + br'2 (FLAGS (\Seen))', + br'3 (FLAGS (\Seen))', + br'4 (FLAGS (\Seen))', + ]) + self.assertEqual(server.args, ['2:4', '(FLAGS)']) + + # A comma-separated set with an open range up to '*'. + typ, data = client.fetch('1,3:*', '(FLAGS)') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [ + br'1 (FLAGS (\Seen))', + br'3 (FLAGS (\Seen))', + br'4 (FLAGS (\Seen))', + ]) + self.assertEqual(server.args, ['1,3:*', '(FLAGS)']) + + # An item with nested parentheses is sent (and parsed) as a + # single argument. + typ, data = client.fetch('1', '(BODY[HEADER.FIELDS (DATE FROM)])') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [br'1 (FLAGS (\Seen))']) + self.assertEqual(server.args, ['1', '(BODY[HEADER.FIELDS (DATE FROM)])']) + + def test_uid_fetch(self): + client, server = self._setup(make_simple_handler('UID', [ + r'* 23 FETCH (FLAGS (\Seen) UID 4827313)', + r'* 24 FETCH (FLAGS (\Seen) UID 4827943)', + r'* 25 FETCH (FLAGS (\Seen) UID 4828442)', + ], 'UID FETCH completed')) + client.login('user', 'pass') + client.select() + typ, data = client.uid('fetch', '4827313:4828442', '(FLAGS)') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [ + br'23 (FLAGS (\Seen) UID 4827313)', + br'24 (FLAGS (\Seen) UID 4827943)', + br'25 (FLAGS (\Seen) UID 4828442)', + ]) + self.assertEqual(server.args, ['FETCH', '4827313:4828442', '(FLAGS)']) + + def test_partial(self): + client, server = self._setup(make_simple_handler('PARTIAL', + ['* 1 FETCH (RFC822.TEXT<0.10> "0123456789")'])) + client.login('user', 'pass') + client.select() + typ, data = client.partial('1', 'RFC822.TEXT', '0', '10') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [b'1 (RFC822.TEXT<0.10> "0123456789")']) + self.assertEqual(server.args, ['1', 'RFC822.TEXT', '0', '10']) + + def test_search(self): + response = [] + client, server = self._setup(make_simple_handler('SEARCH', response)) + client.login('user', 'pass') + client.select() + response[:] = ['* SEARCH 2 84 882'] + typ, data = client.search(None, 'FLAGGED', 'SINCE', '1-Feb-1994', 'NOT', 'FROM', '"Smith"') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [b'2 84 882']) + self.assertEqual(server.args, ['FLAGGED', 'SINCE', '1-Feb-1994', 'NOT', 'FROM', '"Smith"']) + + response[:] = ['* SEARCH'] + typ, data = client.search(None, 'TEXT', '"string not in mailbox"') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [b'']) + self.assertEqual(server.args, ['TEXT', '"string not in mailbox"']) + + response[:] = ['* SEARCH 43'] + typ, data = client.search('UTF-8', 'TEXT', 'XXXXXX') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [b'43']) + self.assertEqual(server.args, ['CHARSET', 'UTF-8', 'TEXT', 'XXXXXX']) + + def test_uid_search(self): + response = [] + client, server = self._setup(make_simple_handler('UID', response, + 'UID SEARCH completed')) + client.login('user', 'pass') + client.select() + response[:] = ['* SEARCH 2 84 882'] + typ, data = client.uid('SEARCH', 'FLAGGED', 'SINCE', '1-Feb-1994', 'NOT', 'FROM', '"Smith"') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [b'2 84 882']) + self.assertEqual(server.args, ['SEARCH', 'FLAGGED', 'SINCE', '1-Feb-1994', 'NOT', 'FROM', '"Smith"']) + + response[:] = ['* SEARCH'] + typ, data = client.uid('SEARCH', 'TEXT', '"string not in mailbox"') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [b'']) + self.assertEqual(server.args, ['SEARCH', 'TEXT', '"string not in mailbox"']) + + response[:] = ['* SEARCH 43'] + typ, data = client.uid('SEARCH', 'CHARSET', 'UTF-8', 'TEXT', 'XXXXXX') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [b'43']) + self.assertEqual(server.args, ['SEARCH', 'CHARSET', 'UTF-8', 'TEXT', 'XXXXXX']) + + typ, data = client.uid('SEARCH', 'CHARSET', '"NF_Z_62-010_(1973)"', 'TEXT', 'XXXXXX') + self.assertEqual(typ, 'OK') + self.assertEqual(server.args, ['SEARCH', 'CHARSET', '"NF_Z_62-010_(1973)"', 'TEXT', 'XXXXXX']) + + def test_sort(self): + response = [] + client, server = self._setup(make_simple_handler('SORT', response)) + client.login('user', 'pass') + client.select() + response[:] = ['* SORT 2 84 882'] + typ, data = client.sort('(SUBJECT)', 'UTF-8', 'SINCE', '1-Feb-1994') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [br'2 84 882']) + self.assertEqual(server.args, ['(SUBJECT)', 'UTF-8', 'SINCE', '1-Feb-1994']) + + response[:] = ['* SORT 5 3 4 1 2'] + typ, data = client.sort('(SUBJECT REVERSE DATE)', 'UTF-8', 'ALL') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [br'5 3 4 1 2']) + self.assertEqual(server.args, ['(SUBJECT REVERSE DATE)', 'UTF-8', 'ALL']) + + response[:] = ['* SORT'] + typ, data = client.sort('(SUBJECT)', 'US-ASCII', 'TEXT', '"not in mailbox"') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [br'']) + self.assertEqual(server.args, ['(SUBJECT)', 'US-ASCII', 'TEXT', '"not in mailbox"']) + + def test_uid_sort(self): + response = [] + client, server = self._setup(make_simple_handler('UID', response, + 'UID SORT completed')) + client.login('user', 'pass') + client.select() + response[:] = ['* SORT 2 84 882'] + typ, data = client.uid('sort', '(SUBJECT)', 'UTF-8', 'SINCE', '1-Feb-1994') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [br'2 84 882']) + self.assertEqual(server.args, ['SORT', '(SUBJECT)', 'UTF-8', 'SINCE', '1-Feb-1994']) + + response[:] = ['* SORT 5 3 4 1 2'] + typ, data = client.uid('sort', '(SUBJECT REVERSE DATE)', 'UTF-8', 'ALL') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [br'5 3 4 1 2']) + self.assertEqual(server.args, ['SORT', '(SUBJECT REVERSE DATE)', 'UTF-8', 'ALL']) + + response[:] = ['* SORT'] + typ, data = client.uid('sort', '(SUBJECT)', 'US-ASCII', 'TEXT', '"not in mailbox"') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [br'']) + self.assertEqual(server.args, ['SORT', '(SUBJECT)', 'US-ASCII', 'TEXT', '"not in mailbox"']) + + def test_thread(self): + response = [] + client, server = self._setup(make_simple_handler('THREAD', response)) + client.login('user', 'pass') + client.select() + response[:] = [ + '* THREAD (166)(167)(168)(169)(172)(170)(171)' + '(173)(174 (175)(176)(178)(181)(180))(179)(177 ' + '(183)(182)(188)(184)(185)(186)(187)(189))(190)' + '(191)(192)(193)(194 195)(196 (197)(198))(199)' + '(200 202)(201)(203)(204)(205)(206 207)(208)'] + typ, data = client.thread('ORDEREDSUBJECT', 'UTF-8', 'SINCE', '5-MAR-2000') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [ + b'(166)(167)(168)(169)(172)(170)(171)' + b'(173)(174 (175)(176)(178)(181)(180))(179)(177 ' + b'(183)(182)(188)(184)(185)(186)(187)(189))(190)' + b'(191)(192)(193)(194 195)(196 (197)(198))(199)' + b'(200 202)(201)(203)(204)(205)(206 207)(208)']) + self.assertEqual(server.args, ['ORDEREDSUBJECT', 'UTF-8', 'SINCE', '5-MAR-2000']) + + response[:] = [ + '* THREAD (166)(167)(168)(169)(172)((170)(179))' + '(171)(173)((174)(175)(176)(178)(181)(180))' + '((177)(183)(182)(188 (184)(189))(185 186)(187))' + '(190)(191)(192)(193)((194)(195 196))(197 198)' + '(199)(200 202)(201)(203)(204)(205 206 207)(208)'] + typ, data = client.thread('ORDEREDSUBJECT', 'US-ASCII', 'TEXT', '"gewp"') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [ + b'(166)(167)(168)(169)(172)((170)(179))' + b'(171)(173)((174)(175)(176)(178)(181)(180))' + b'((177)(183)(182)(188 (184)(189))(185 186)(187))' + b'(190)(191)(192)(193)((194)(195 196))(197 198)' + b'(199)(200 202)(201)(203)(204)(205 206 207)(208)']) + self.assertEqual(server.args, ['ORDEREDSUBJECT', 'US-ASCII', 'TEXT', '"gewp"']) + + def test_uid_thread(self): + response = [] + client, server = self._setup(make_simple_handler('UID', response, + 'UID THREAD completed')) + client.login('user', 'pass') + client.select() + response[:] = [ + '* THREAD (166)(167)(168)(169)(172)(170)(171)' + '(173)(174 (175)(176)(178)(181)(180))(179)(177 ' + '(183)(182)(188)(184)(185)(186)(187)(189))(190)' + '(191)(192)(193)(194 195)(196 (197)(198))(199)' + '(200 202)(201)(203)(204)(205)(206 207)(208)'] + typ, data = client.uid('THREAD', 'ORDEREDSUBJECT', 'UTF-8', 'SINCE', '5-MAR-2000') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [ + b'(166)(167)(168)(169)(172)(170)(171)' + b'(173)(174 (175)(176)(178)(181)(180))(179)(177 ' + b'(183)(182)(188)(184)(185)(186)(187)(189))(190)' + b'(191)(192)(193)(194 195)(196 (197)(198))(199)' + b'(200 202)(201)(203)(204)(205)(206 207)(208)']) + self.assertEqual(server.args, ['THREAD', 'ORDEREDSUBJECT', 'UTF-8', 'SINCE', '5-MAR-2000']) + + response[:] = [ + '* THREAD (166)(167)(168)(169)(172)((170)(179))' + '(171)(173)((174)(175)(176)(178)(181)(180))' + '((177)(183)(182)(188 (184)(189))(185 186)(187))' + '(190)(191)(192)(193)((194)(195 196))(197 198)' + '(199)(200 202)(201)(203)(204)(205 206 207)(208)'] + typ, data = client.uid('THREAD', 'ORDEREDSUBJECT', 'US-ASCII', 'TEXT', '"gewp"') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [ + b'(166)(167)(168)(169)(172)((170)(179))' + b'(171)(173)((174)(175)(176)(178)(181)(180))' + b'((177)(183)(182)(188 (184)(189))(185 186)(187))' + b'(190)(191)(192)(193)((194)(195 196))(197 198)' + b'(199)(200 202)(201)(203)(204)(205 206 207)(208)']) + self.assertEqual(server.args, ['THREAD', 'ORDEREDSUBJECT', 'US-ASCII', 'TEXT', '"gewp"']) + + def test_delete(self): + client, server = self._setup(make_simple_handler('DELETE')) + client.login('user', 'pass') + typ, data = client.delete('blurdybloop') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [b'DELETE completed']) + self.assertEqual(server.args, ['blurdybloop']) + + typ, data = client.delete('foo/bar') + self.assertEqual(typ, 'OK') + self.assertEqual(server.args, ['foo/bar']) + + def test_rename(self): + client, server = self._setup(make_simple_handler('RENAME')) + client.login('user', 'pass') + typ, data = client.rename('blurdybloop', 'sarasoop') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [b'RENAME completed']) + self.assertEqual(server.args, ['blurdybloop', 'sarasoop']) + + def test_subscribe(self): + client, server = self._setup(make_simple_handler('SUBSCRIBE')) + client.login('user', 'pass') + typ, data = client.subscribe('#news.comp.mail.mime') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [b'SUBSCRIBE completed']) + self.assertEqual(server.args, ['#news.comp.mail.mime']) + + def test_unsubscribe(self): + client, server = self._setup(make_simple_handler('UNSUBSCRIBE')) + client.login('user', 'pass') + typ, data = client.unsubscribe('#news.comp.mail.mime') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [b'UNSUBSCRIBE completed']) + self.assertEqual(server.args, ['#news.comp.mail.mime']) + + def test_list(self): + client, server = self._setup(make_simple_handler('LIST', + [r'* LIST (\Noselect) "/" ""', + r'* LIST (\Unmarked) "/" "~/Mail/foo"'])) + client.login('user', 'pass') + typ, data = client.list() + self.assertEqual(typ, 'OK') + self.assertEqual(data, [br'(\Noselect) "/" ""', + br'(\Unmarked) "/" "~/Mail/foo"']) + self.assertEqual(server.args, ['""', '*']) + + typ, data = client.list('~/Mail/', '%') + self.assertEqual(typ, 'OK') + self.assertEqual(server.args, ['~/Mail/', '%']) + + def test_status(self): + client, server = self._setup(make_simple_handler('STATUS', + ['* STATUS blurdybloop (MESSAGES 231 UIDNEXT 44292)'])) + client.login('user', 'pass') + typ, data = client.status('blurdybloop', '(UIDNEXT MESSAGES)') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [b'blurdybloop (MESSAGES 231 UIDNEXT 44292)']) + self.assertEqual(server.args, ['blurdybloop', '(UIDNEXT MESSAGES)']) + + def test_getacl(self): + client, server = self._setup(make_simple_handler('GETACL', + ['* ACL INBOX Fred rwipslxetad'])) + client.login('user', 'pass') + typ, data = client.getacl('INBOX') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [b'INBOX Fred rwipslxetad']) + self.assertEqual(server.args, ['INBOX']) + + def test_setacl(self): + client, server = self._setup(make_simple_handler('SETACL')) + client.login('user', 'pass') + typ, data = client.setacl('INBOX', 'Fred', 'rwipslxetad') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [b'SETACL completed']) + self.assertEqual(server.args, ['INBOX', 'Fred', 'rwipslxetad']) + + def test_deleteacl(self): + client, server = self._setup(make_simple_handler('DELETEACL')) + client.login('user', 'pass') + typ, data = client.deleteacl('INBOX', 'Fred') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [b'DELETEACL completed']) + self.assertEqual(server.args, ['INBOX', 'Fred']) + + def test_myrights(self): + client, server = self._setup(make_simple_handler('MYRIGHTS', + ['* MYRIGHTS INBOX rwiptsldaex'])) + client.login('user', 'pass') + typ, data = client.myrights('INBOX') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [b'INBOX rwiptsldaex']) + self.assertEqual(server.args, ['INBOX']) + + def test_getquota(self): + client, server = self._setup(make_simple_handler('GETQUOTA', + ['* QUOTA "" (STORAGE 10 512)'])) + client.login('user', 'pass') + typ, data = client.getquota('#news') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [b'"" (STORAGE 10 512)']) + self.assertEqual(server.args, ['#news']) + + def test_getquotaroot(self): + client, server = self._setup(make_simple_handler('GETQUOTAROOT', + ['* QUOTAROOT INBOX ""', '* QUOTA "" (STORAGE 10 512)'])) + client.login('user', 'pass') + typ, data = client.getquotaroot('INBOX') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [[b'INBOX ""'], [b'"" (STORAGE 10 512)']]) + self.assertEqual(server.args, ['INBOX']) + + def test_setquota(self): + client, server = self._setup(make_simple_handler('SETQUOTA', + ['* QUOTA "" (STORAGE 512)'])) + client.login('user', 'pass') + typ, data = client.setquota('#news', '(STORAGE 512)') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [b'"" (STORAGE 512)']) + self.assertEqual(server.args, ['#news', '(STORAGE 512)']) + + def test_getannotation(self): + client, server = self._setup(make_simple_handler('GETANNOTATION', + ['* ANNOTATION INBOX "/comment" ("value.shared" "Hello")'])) + client.login('user', 'pass') + typ, data = client.getannotation('INBOX', '/comment', 'value.shared') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [b'INBOX "/comment" ("value.shared" "Hello")']) + self.assertEqual(server.args, ['INBOX', '/comment', 'value.shared']) + + def test_setannotation(self): + client, server = self._setup(make_simple_handler('SETANNOTATION')) + client.login('user', 'pass') + typ, data = client.setannotation('INBOX', '/comment', + '("value.shared" "My comment")') + self.assertEqual(typ, 'OK') + self.assertEqual(server.args, + ['INBOX', '/comment', '("value.shared" "My comment")']) + + def test_proxyauth(self): + client, server = self._setup(make_simple_handler('PROXYAUTH')) + client.login('user', 'pass') + typ, data = client.proxyauth('user') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [b'PROXYAUTH completed']) + self.assertEqual(server.args, ['user']) + + def test_xatom(self): + client, server = self._setup(make_simple_handler('MYCOMMAND', + completed='MYCOMMAND completed')) + client.login('user', 'pass') + self.addCleanup(imaplib.Commands.pop, 'MYCOMMAND', None) + typ, data = client.xatom('MYCOMMAND', 'arg1', 'arg2') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [b'MYCOMMAND completed']) + self.assertEqual(server.args, ['arg1', 'arg2']) def test_control_characters(self): client, _ = self._setup(SimpleIMAPHandler)