diff --git a/Lib/test/test_imaplib.py b/Lib/test/test_imaplib.py index fa23d09806e6fb1..1d1974e51d108dd 100644 --- a/Lib/test/test_imaplib.py +++ b/Lib/test/test_imaplib.py @@ -26,6 +26,68 @@ CERTFILE = os.path.join(os.path.dirname(__file__) or os.curdir, "certdata", "keycert3.pem") CAFILE = os.path.join(os.path.dirname(__file__) or os.curdir, "certdata", "pycacert.pem") +_quoted_string = re.compile(r'"(?:[^"\\]|\\.)*"') + + +def splitargs(line): + # Split a command line into IMAP arguments: quoted strings, balanced + # (possibly nested) parenthesized lists, and bare atoms. + args = [] + i, n = 0, len(line) + while i < n: + if line[i] == ' ': + i += 1 + continue + start = i + depth = 0 + while i < n and (depth or line[i] != ' '): + c = line[i] + if c == '"': + i = _quoted_string.match(line, i).end() + continue + elif c == '(': + depth += 1 + elif c == ')': + depth -= 1 + i += 1 + args.append(line[start:i]) + return args + + +def parse_seq_number(s, maxmsg): + if s == '*': + return maxmsg + return int(s) + + +def parse_sequence_set(arg, maxmsg): + for s in arg.split(','): + if ':' in s: + lo, hi = s.split(':') + lo = parse_seq_number(lo, maxmsg) + hi = parse_seq_number(hi, maxmsg) + yield from range(min(lo, hi), max(lo, hi) + 1) + else: + yield parse_seq_number(s, maxmsg) + + +def make_simple_handler(command, untagged_response=(), + completed=None): + if completed is None: + completed = f'{command} completed' + def cmd(self, tag, args): + for msg in untagged_response: + self._send_textline(msg) + self._send_tagged(tag, 'OK', completed) + cmd.__name__ = 'cmd_' + command + class Handler(SimpleIMAPHandler): + pass + Handler.__name__ = command.title() + 'Handler' + setattr(Handler, cmd.__name__, cmd) + Handler.__qualname__ = Handler.__name__ + cmd.__qualname__ = Handler.__qualname__ + '.' + cmd.__name__ + return Handler + class TestImaplib(unittest.TestCase): @@ -119,7 +181,7 @@ class SimpleIMAPHandler(socketserver.StreamRequestHandler): def setup(self): super().setup() - self.server.is_selected = False + self.server.is_selected = None self.server.logged = None def _send(self, message): @@ -167,10 +229,11 @@ def handle(self): except StopIteration: self.continuation = None continue - splitline = line.decode('ASCII').split() + splitline = splitargs(line.decode().removesuffix('\r\n')) tag = splitline[0] cmd = splitline[1] args = splitline[2:] + self.server.args = args if hasattr(self, 'cmd_' + cmd): continuation = getattr(self, 'cmd_' + cmd)(tag, args) @@ -197,13 +260,18 @@ def cmd_LOGIN(self, tag, args): self._send_tagged(tag, 'OK', 'LOGIN completed') def cmd_SELECT(self, tag, args): - self.server.is_selected = True + self.server.is_selected = args self._send_line(b'* 2 EXISTS') self._send_tagged(tag, 'OK', '[READ-WRITE] SELECT completed.') + def cmd_EXAMINE(self, tag, args): + self.server.is_selected = args + self._send_line(b'* 2 EXISTS') + self._send_tagged(tag, 'OK', '[READ-ONLY] EXAMINE completed.') + def cmd_UNSELECT(self, tag, args): - if self.server.is_selected: - self.server.is_selected = False + if self.server.is_selected is not None: + self.server.is_selected = None self._send_tagged(tag, 'OK', 'Returned to authenticated state. (Success)') else: self._send_tagged(tag, 'BAD', 'No mailbox selected') @@ -712,18 +780,20 @@ def test_logout(self): self.assertEqual(client.state, 'LOGOUT') def test_lsub(self): - class LsubCmd(SimpleIMAPHandler): - def cmd_LSUB(self, tag, args): - self._send_textline('* LSUB () "." directoryA') - return self._send_tagged(tag, 'OK', 'LSUB completed') - client, _ = self._setup(LsubCmd) + client, server = self._setup(make_simple_handler('LSUB', + ['* LSUB () "." directoryA', '* LSUB () "." directoryB'])) client.login('user', 'pass') typ, data = client.lsub() self.assertEqual(typ, 'OK') - self.assertEqual(data[0], b'() "." directoryA') + self.assertEqual(data, [b'() "." directoryA', b'() "." directoryB']) + self.assertEqual(server.args, ['""', '*']) + + typ, data = client.lsub('~/Mail/', '%') + self.assertEqual(typ, 'OK') + self.assertEqual(server.args, ['~/Mail/', '%']) def test_unselect(self): - client, _ = self._setup(SimpleIMAPHandler) + client, server = self._setup(SimpleIMAPHandler) client.login('user', 'pass') typ, data = client.select() self.assertEqual(typ, 'OK') @@ -733,6 +803,584 @@ def test_unselect(self): self.assertEqual(typ, 'OK') self.assertEqual(data[0], b'Returned to authenticated state. (Success)') self.assertEqual(client.state, 'AUTH') + self.assertIsNone(server.is_selected) + + def test_enable(self): + class EnableHandler(SimpleIMAPHandler): + capabilities = 'IMAP4rev1 ID LITERAL+ ENABLE X-GOOD-IDEA' + def cmd_ENABLE(self, tag, args): + capabilities = self.capabilities.split() + for arg in args: + if arg in capabilities: + self._send_textline('* ENABLED ' + arg) + self._send_tagged(tag, 'OK', 'foo') + + client, server = self._setup(EnableHandler) + client.login('user', 'pass') + code, data = client.enable('CONDSTORE X-GOOD-IDEA') + self.assertEqual(code, 'OK') + self.assertEqual(data, [b'foo']) + self.assertEqual(server.args, ['CONDSTORE', 'X-GOOD-IDEA']) + + def test_select(self): + client, server = self._setup(SimpleIMAPHandler) + client.login('user', 'pass') + typ, data = client.select() + self.assertEqual(typ, 'OK') + self.assertEqual(data[0], b'2') + self.assertEqual(server.is_selected, ['INBOX']) + + typ, data = client.select('Archive') + self.assertEqual(typ, 'OK') + self.assertEqual(server.is_selected, ['Archive']) + + # readonly=True issues EXAMINE instead of SELECT + # (there is no separate examine() method). + typ, data = client.select(readonly=True) + self.assertEqual(typ, 'OK') + self.assertEqual(data[0], b'2') + self.assertEqual(server.is_selected, ['INBOX']) + self.assertTrue(client.is_readonly) + + def test_expunge(self): + client, server = self._setup(make_simple_handler('EXPUNGE', + ['* 3 EXPUNGE', '* 3 EXPUNGE', '* 5 EXPUNGE', '* 8 EXPUNGE'])) + client.login('user', 'pass') + client.select() + typ, data = client.expunge() + self.assertEqual(typ, 'OK') + self.assertEqual(data, [b'3', b'3', b'5', b'8']) + + def test_close(self): + client, server = self._setup(make_simple_handler('CLOSE')) + client.login('user', 'pass') + client.select() + typ, data = client.close() + self.assertEqual(typ, 'OK') + self.assertEqual(client.state, 'AUTH') + + def test_check(self): + client, server = self._setup(make_simple_handler('CHECK')) + client.login('user', 'pass') + client.select() + typ, data = client.check() + self.assertEqual(typ, 'OK') + + def test_noop(self): + client, server = self._setup(make_simple_handler('NOOP', + ['* 4 EXISTS'])) + client.login('user', 'pass') + typ, data = client.noop() + self.assertEqual(typ, 'OK') + # NOOP is used to pick up server-pushed untagged responses. + self.assertEqual(client.untagged_responses['EXISTS'], [b'4']) + + def test_namespace(self): + client, server = self._setup(make_simple_handler('NAMESPACE', + ['* NAMESPACE (("" "/")) NIL NIL'])) + client.login('user', 'pass') + typ, data = client.namespace() + self.assertEqual(typ, 'OK') + self.assertEqual(data[0], b'(("" "/")) NIL NIL') + + def test_capability(self): + client, server = self._setup(make_simple_handler('CAPABILITY', + ['* CAPABILITY IMAP4rev1 IDLE NAMESPACE'])) + client.login('user', 'pass') + typ, data = client.capability() + self.assertEqual(typ, 'OK') + self.assertEqual(data, [b'IMAP4rev1 IDLE NAMESPACE']) + + def test_recent(self): + # recent() prods the server with NOOP if no RECENT was seen yet. + client, server = self._setup(make_simple_handler('NOOP', + ['* 5 RECENT'])) + client.login('user', 'pass') + typ, data = client.recent() + self.assertEqual(typ, 'OK') + self.assertEqual(data, [b'5']) + + def test_response(self): + client, server = self._setup(SimpleIMAPHandler) + client.login('user', 'pass') + client.select() # the handler answers with '* 2 EXISTS' + typ, data = client.response('EXISTS') + self.assertEqual(typ, 'EXISTS') + self.assertEqual(data, [b'2']) + # The value is cleared once read. + typ, data = client.response('EXISTS') + self.assertEqual(data, [None]) + + def test_create(self): + client, server = self._setup(make_simple_handler('CREATE')) + client.login('user', 'pass') + typ, data = client.create('owatagusiam/') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [b'CREATE completed']) + self.assertEqual(server.args, ['owatagusiam/']) + + typ, data = client.create('owatagusiam/blurdybloop') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [b'CREATE completed']) + self.assertEqual(server.args, ['owatagusiam/blurdybloop']) + + def test_copy(self): + client, server = self._setup(make_simple_handler('COPY')) + client.login('user', 'pass') + client.select() + typ, data = client.copy('2:4', 'MEETING') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [b'COPY completed']) + self.assertEqual(server.args, ['2:4', 'MEETING']) + + def test_uid_copy(self): + client, server = self._setup(make_simple_handler('UID', + completed='UID COPY completed')) + client.login('user', 'pass') + client.select() + typ, data = client.uid('copy', '4827313:4828442', 'MEETING') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [None]) + self.assertEqual(server.args, ['COPY', '4827313:4828442', 'MEETING']) + + def test_store(self): + client, server = self._setup(make_simple_handler('STORE', [ + r'* 2 FETCH (FLAGS (\Deleted \Seen))', + r'* 3 FETCH (FLAGS (\Deleted))', + r'* 4 FETCH (FLAGS (\Deleted \Flagged \Seen))', + ])) + client.login('user', 'pass') + client.select() + typ, data = client.store('2:4', '+FLAGS', r'(\Deleted)') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [ + br'2 (FLAGS (\Deleted \Seen))', + br'3 (FLAGS (\Deleted))', + br'4 (FLAGS (\Deleted \Flagged \Seen))', + ]) + self.assertEqual(server.args, ['2:4', '+FLAGS', r'(\Deleted)']) + + typ, data = client.store('2:4', '+FLAGS', r'\Deleted') + self.assertEqual(typ, 'OK') + self.assertEqual(server.args, ['2:4', '+FLAGS', r'(\Deleted)']) + + def test_uid_store(self): + client, server = self._setup(make_simple_handler('UID', [ + r'* 23 FETCH (FLAGS (\Deleted \Seen) UID 4827313)', + r'* 24 FETCH (FLAGS (\Deleted) UID 4827943)', + r'* 25 FETCH (FLAGS (\Deleted \Flagged \Seen) UID 4828442)', + ], 'UID STORE completed')) + client.login('user', 'pass') + client.select() + typ, data = client.uid('store', '4827313:4828442', '+FLAGS', r'(\Deleted)') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [ + br'23 (FLAGS (\Deleted \Seen) UID 4827313)', + br'24 (FLAGS (\Deleted) UID 4827943)', + br'25 (FLAGS (\Deleted \Flagged \Seen) UID 4828442)', + ]) + self.assertEqual(server.args, ['STORE', '4827313:4828442', '+FLAGS', r'(\Deleted)']) + + def test_fetch(self): + # The handler expands the requested sequence set and answers for + # exactly those messages, so the test exercises the round trip of + # the message set, not just a canned reply. + class FetchHandler(SimpleIMAPHandler): + messages = 4 + def cmd_SELECT(self, tag, args): + self.server.is_selected = args + self._send_line(b'* %d EXISTS' % self.messages) + self._send_tagged(tag, 'OK', '[READ-WRITE] SELECT completed.') + def cmd_FETCH(self, tag, args): + for n in parse_sequence_set(args[0], self.messages): + self._send_textline(r'* %d FETCH (FLAGS (\Seen))' % n) + self._send_tagged(tag, 'OK', 'FETCH completed') + + client, server = self._setup(FetchHandler) + client.login('user', 'pass') + client.select() + typ, data = client.fetch('2:4', '(FLAGS)') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [ + br'2 (FLAGS (\Seen))', + br'3 (FLAGS (\Seen))', + br'4 (FLAGS (\Seen))', + ]) + self.assertEqual(server.args, ['2:4', '(FLAGS)']) + + # A comma-separated set with an open range up to '*'. + typ, data = client.fetch('1,3:*', '(FLAGS)') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [ + br'1 (FLAGS (\Seen))', + br'3 (FLAGS (\Seen))', + br'4 (FLAGS (\Seen))', + ]) + self.assertEqual(server.args, ['1,3:*', '(FLAGS)']) + + # An item with nested parentheses is sent (and parsed) as a + # single argument. + typ, data = client.fetch('1', '(BODY[HEADER.FIELDS (DATE FROM)])') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [br'1 (FLAGS (\Seen))']) + self.assertEqual(server.args, ['1', '(BODY[HEADER.FIELDS (DATE FROM)])']) + + def test_uid_fetch(self): + client, server = self._setup(make_simple_handler('UID', [ + r'* 23 FETCH (FLAGS (\Seen) UID 4827313)', + r'* 24 FETCH (FLAGS (\Seen) UID 4827943)', + r'* 25 FETCH (FLAGS (\Seen) UID 4828442)', + ], 'UID FETCH completed')) + client.login('user', 'pass') + client.select() + typ, data = client.uid('fetch', '4827313:4828442', '(FLAGS)') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [ + br'23 (FLAGS (\Seen) UID 4827313)', + br'24 (FLAGS (\Seen) UID 4827943)', + br'25 (FLAGS (\Seen) UID 4828442)', + ]) + self.assertEqual(server.args, ['FETCH', '4827313:4828442', '(FLAGS)']) + + def test_partial(self): + client, server = self._setup(make_simple_handler('PARTIAL', + ['* 1 FETCH (RFC822.TEXT<0.10> "0123456789")'])) + client.login('user', 'pass') + client.select() + typ, data = client.partial('1', 'RFC822.TEXT', '0', '10') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [b'1 (RFC822.TEXT<0.10> "0123456789")']) + self.assertEqual(server.args, ['1', 'RFC822.TEXT', '0', '10']) + + def test_search(self): + response = [] + client, server = self._setup(make_simple_handler('SEARCH', response)) + client.login('user', 'pass') + client.select() + response[:] = ['* SEARCH 2 84 882'] + typ, data = client.search(None, 'FLAGGED', 'SINCE', '1-Feb-1994', 'NOT', 'FROM', '"Smith"') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [b'2 84 882']) + self.assertEqual(server.args, ['FLAGGED', 'SINCE', '1-Feb-1994', 'NOT', 'FROM', '"Smith"']) + + response[:] = ['* SEARCH'] + typ, data = client.search(None, 'TEXT', '"string not in mailbox"') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [b'']) + self.assertEqual(server.args, ['TEXT', '"string not in mailbox"']) + + response[:] = ['* SEARCH 43'] + typ, data = client.search('UTF-8', 'TEXT', 'XXXXXX') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [b'43']) + self.assertEqual(server.args, ['CHARSET', 'UTF-8', 'TEXT', 'XXXXXX']) + + def test_uid_search(self): + response = [] + client, server = self._setup(make_simple_handler('UID', response, + 'UID SEARCH completed')) + client.login('user', 'pass') + client.select() + response[:] = ['* SEARCH 2 84 882'] + typ, data = client.uid('SEARCH', 'FLAGGED', 'SINCE', '1-Feb-1994', 'NOT', 'FROM', '"Smith"') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [b'2 84 882']) + self.assertEqual(server.args, ['SEARCH', 'FLAGGED', 'SINCE', '1-Feb-1994', 'NOT', 'FROM', '"Smith"']) + + response[:] = ['* SEARCH'] + typ, data = client.uid('SEARCH', 'TEXT', '"string not in mailbox"') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [b'']) + self.assertEqual(server.args, ['SEARCH', 'TEXT', '"string not in mailbox"']) + + response[:] = ['* SEARCH 43'] + typ, data = client.uid('SEARCH', 'CHARSET', 'UTF-8', 'TEXT', 'XXXXXX') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [b'43']) + self.assertEqual(server.args, ['SEARCH', 'CHARSET', 'UTF-8', 'TEXT', 'XXXXXX']) + + typ, data = client.uid('SEARCH', 'CHARSET', '"NF_Z_62-010_(1973)"', 'TEXT', 'XXXXXX') + self.assertEqual(typ, 'OK') + self.assertEqual(server.args, ['SEARCH', 'CHARSET', '"NF_Z_62-010_(1973)"', 'TEXT', 'XXXXXX']) + + def test_sort(self): + response = [] + client, server = self._setup(make_simple_handler('SORT', response)) + client.login('user', 'pass') + client.select() + response[:] = ['* SORT 2 84 882'] + typ, data = client.sort('(SUBJECT)', 'UTF-8', 'SINCE', '1-Feb-1994') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [br'2 84 882']) + self.assertEqual(server.args, ['(SUBJECT)', 'UTF-8', 'SINCE', '1-Feb-1994']) + + response[:] = ['* SORT 5 3 4 1 2'] + typ, data = client.sort('(SUBJECT REVERSE DATE)', 'UTF-8', 'ALL') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [br'5 3 4 1 2']) + self.assertEqual(server.args, ['(SUBJECT REVERSE DATE)', 'UTF-8', 'ALL']) + + response[:] = ['* SORT'] + typ, data = client.sort('(SUBJECT)', 'US-ASCII', 'TEXT', '"not in mailbox"') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [br'']) + self.assertEqual(server.args, ['(SUBJECT)', 'US-ASCII', 'TEXT', '"not in mailbox"']) + + def test_uid_sort(self): + response = [] + client, server = self._setup(make_simple_handler('UID', response, + 'UID SORT completed')) + client.login('user', 'pass') + client.select() + response[:] = ['* SORT 2 84 882'] + typ, data = client.uid('sort', '(SUBJECT)', 'UTF-8', 'SINCE', '1-Feb-1994') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [br'2 84 882']) + self.assertEqual(server.args, ['SORT', '(SUBJECT)', 'UTF-8', 'SINCE', '1-Feb-1994']) + + response[:] = ['* SORT 5 3 4 1 2'] + typ, data = client.uid('sort', '(SUBJECT REVERSE DATE)', 'UTF-8', 'ALL') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [br'5 3 4 1 2']) + self.assertEqual(server.args, ['SORT', '(SUBJECT REVERSE DATE)', 'UTF-8', 'ALL']) + + response[:] = ['* SORT'] + typ, data = client.uid('sort', '(SUBJECT)', 'US-ASCII', 'TEXT', '"not in mailbox"') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [br'']) + self.assertEqual(server.args, ['SORT', '(SUBJECT)', 'US-ASCII', 'TEXT', '"not in mailbox"']) + + def test_thread(self): + response = [] + client, server = self._setup(make_simple_handler('THREAD', response)) + client.login('user', 'pass') + client.select() + response[:] = [ + '* THREAD (166)(167)(168)(169)(172)(170)(171)' + '(173)(174 (175)(176)(178)(181)(180))(179)(177 ' + '(183)(182)(188)(184)(185)(186)(187)(189))(190)' + '(191)(192)(193)(194 195)(196 (197)(198))(199)' + '(200 202)(201)(203)(204)(205)(206 207)(208)'] + typ, data = client.thread('ORDEREDSUBJECT', 'UTF-8', 'SINCE', '5-MAR-2000') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [ + b'(166)(167)(168)(169)(172)(170)(171)' + b'(173)(174 (175)(176)(178)(181)(180))(179)(177 ' + b'(183)(182)(188)(184)(185)(186)(187)(189))(190)' + b'(191)(192)(193)(194 195)(196 (197)(198))(199)' + b'(200 202)(201)(203)(204)(205)(206 207)(208)']) + self.assertEqual(server.args, ['ORDEREDSUBJECT', 'UTF-8', 'SINCE', '5-MAR-2000']) + + response[:] = [ + '* THREAD (166)(167)(168)(169)(172)((170)(179))' + '(171)(173)((174)(175)(176)(178)(181)(180))' + '((177)(183)(182)(188 (184)(189))(185 186)(187))' + '(190)(191)(192)(193)((194)(195 196))(197 198)' + '(199)(200 202)(201)(203)(204)(205 206 207)(208)'] + typ, data = client.thread('ORDEREDSUBJECT', 'US-ASCII', 'TEXT', '"gewp"') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [ + b'(166)(167)(168)(169)(172)((170)(179))' + b'(171)(173)((174)(175)(176)(178)(181)(180))' + b'((177)(183)(182)(188 (184)(189))(185 186)(187))' + b'(190)(191)(192)(193)((194)(195 196))(197 198)' + b'(199)(200 202)(201)(203)(204)(205 206 207)(208)']) + self.assertEqual(server.args, ['ORDEREDSUBJECT', 'US-ASCII', 'TEXT', '"gewp"']) + + def test_uid_thread(self): + response = [] + client, server = self._setup(make_simple_handler('UID', response, + 'UID THREAD completed')) + client.login('user', 'pass') + client.select() + response[:] = [ + '* THREAD (166)(167)(168)(169)(172)(170)(171)' + '(173)(174 (175)(176)(178)(181)(180))(179)(177 ' + '(183)(182)(188)(184)(185)(186)(187)(189))(190)' + '(191)(192)(193)(194 195)(196 (197)(198))(199)' + '(200 202)(201)(203)(204)(205)(206 207)(208)'] + typ, data = client.uid('THREAD', 'ORDEREDSUBJECT', 'UTF-8', 'SINCE', '5-MAR-2000') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [ + b'(166)(167)(168)(169)(172)(170)(171)' + b'(173)(174 (175)(176)(178)(181)(180))(179)(177 ' + b'(183)(182)(188)(184)(185)(186)(187)(189))(190)' + b'(191)(192)(193)(194 195)(196 (197)(198))(199)' + b'(200 202)(201)(203)(204)(205)(206 207)(208)']) + self.assertEqual(server.args, ['THREAD', 'ORDEREDSUBJECT', 'UTF-8', 'SINCE', '5-MAR-2000']) + + response[:] = [ + '* THREAD (166)(167)(168)(169)(172)((170)(179))' + '(171)(173)((174)(175)(176)(178)(181)(180))' + '((177)(183)(182)(188 (184)(189))(185 186)(187))' + '(190)(191)(192)(193)((194)(195 196))(197 198)' + '(199)(200 202)(201)(203)(204)(205 206 207)(208)'] + typ, data = client.uid('THREAD', 'ORDEREDSUBJECT', 'US-ASCII', 'TEXT', '"gewp"') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [ + b'(166)(167)(168)(169)(172)((170)(179))' + b'(171)(173)((174)(175)(176)(178)(181)(180))' + b'((177)(183)(182)(188 (184)(189))(185 186)(187))' + b'(190)(191)(192)(193)((194)(195 196))(197 198)' + b'(199)(200 202)(201)(203)(204)(205 206 207)(208)']) + self.assertEqual(server.args, ['THREAD', 'ORDEREDSUBJECT', 'US-ASCII', 'TEXT', '"gewp"']) + + def test_delete(self): + client, server = self._setup(make_simple_handler('DELETE')) + client.login('user', 'pass') + typ, data = client.delete('blurdybloop') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [b'DELETE completed']) + self.assertEqual(server.args, ['blurdybloop']) + + typ, data = client.delete('foo/bar') + self.assertEqual(typ, 'OK') + self.assertEqual(server.args, ['foo/bar']) + + def test_rename(self): + client, server = self._setup(make_simple_handler('RENAME')) + client.login('user', 'pass') + typ, data = client.rename('blurdybloop', 'sarasoop') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [b'RENAME completed']) + self.assertEqual(server.args, ['blurdybloop', 'sarasoop']) + + def test_subscribe(self): + client, server = self._setup(make_simple_handler('SUBSCRIBE')) + client.login('user', 'pass') + typ, data = client.subscribe('#news.comp.mail.mime') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [b'SUBSCRIBE completed']) + self.assertEqual(server.args, ['#news.comp.mail.mime']) + + def test_unsubscribe(self): + client, server = self._setup(make_simple_handler('UNSUBSCRIBE')) + client.login('user', 'pass') + typ, data = client.unsubscribe('#news.comp.mail.mime') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [b'UNSUBSCRIBE completed']) + self.assertEqual(server.args, ['#news.comp.mail.mime']) + + def test_list(self): + client, server = self._setup(make_simple_handler('LIST', + [r'* LIST (\Noselect) "/" ""', + r'* LIST (\Unmarked) "/" "~/Mail/foo"'])) + client.login('user', 'pass') + typ, data = client.list() + self.assertEqual(typ, 'OK') + self.assertEqual(data, [br'(\Noselect) "/" ""', + br'(\Unmarked) "/" "~/Mail/foo"']) + self.assertEqual(server.args, ['""', '*']) + + typ, data = client.list('~/Mail/', '%') + self.assertEqual(typ, 'OK') + self.assertEqual(server.args, ['~/Mail/', '%']) + + def test_status(self): + client, server = self._setup(make_simple_handler('STATUS', + ['* STATUS blurdybloop (MESSAGES 231 UIDNEXT 44292)'])) + client.login('user', 'pass') + typ, data = client.status('blurdybloop', '(UIDNEXT MESSAGES)') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [b'blurdybloop (MESSAGES 231 UIDNEXT 44292)']) + self.assertEqual(server.args, ['blurdybloop', '(UIDNEXT MESSAGES)']) + + def test_getacl(self): + client, server = self._setup(make_simple_handler('GETACL', + ['* ACL INBOX Fred rwipslxetad'])) + client.login('user', 'pass') + typ, data = client.getacl('INBOX') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [b'INBOX Fred rwipslxetad']) + self.assertEqual(server.args, ['INBOX']) + + def test_setacl(self): + client, server = self._setup(make_simple_handler('SETACL')) + client.login('user', 'pass') + typ, data = client.setacl('INBOX', 'Fred', 'rwipslxetad') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [b'SETACL completed']) + self.assertEqual(server.args, ['INBOX', 'Fred', 'rwipslxetad']) + + def test_deleteacl(self): + client, server = self._setup(make_simple_handler('DELETEACL')) + client.login('user', 'pass') + typ, data = client.deleteacl('INBOX', 'Fred') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [b'DELETEACL completed']) + self.assertEqual(server.args, ['INBOX', 'Fred']) + + def test_myrights(self): + client, server = self._setup(make_simple_handler('MYRIGHTS', + ['* MYRIGHTS INBOX rwiptsldaex'])) + client.login('user', 'pass') + typ, data = client.myrights('INBOX') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [b'INBOX rwiptsldaex']) + self.assertEqual(server.args, ['INBOX']) + + def test_getquota(self): + client, server = self._setup(make_simple_handler('GETQUOTA', + ['* QUOTA "" (STORAGE 10 512)'])) + client.login('user', 'pass') + typ, data = client.getquota('#news') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [b'"" (STORAGE 10 512)']) + self.assertEqual(server.args, ['#news']) + + def test_getquotaroot(self): + client, server = self._setup(make_simple_handler('GETQUOTAROOT', + ['* QUOTAROOT INBOX ""', '* QUOTA "" (STORAGE 10 512)'])) + client.login('user', 'pass') + typ, data = client.getquotaroot('INBOX') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [[b'INBOX ""'], [b'"" (STORAGE 10 512)']]) + self.assertEqual(server.args, ['INBOX']) + + def test_setquota(self): + client, server = self._setup(make_simple_handler('SETQUOTA', + ['* QUOTA "" (STORAGE 512)'])) + client.login('user', 'pass') + typ, data = client.setquota('#news', '(STORAGE 512)') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [b'"" (STORAGE 512)']) + self.assertEqual(server.args, ['#news', '(STORAGE 512)']) + + def test_getannotation(self): + client, server = self._setup(make_simple_handler('GETANNOTATION', + ['* ANNOTATION INBOX "/comment" ("value.shared" "Hello")'])) + client.login('user', 'pass') + typ, data = client.getannotation('INBOX', '/comment', 'value.shared') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [b'INBOX "/comment" ("value.shared" "Hello")']) + self.assertEqual(server.args, ['INBOX', '/comment', 'value.shared']) + + def test_setannotation(self): + client, server = self._setup(make_simple_handler('SETANNOTATION')) + client.login('user', 'pass') + typ, data = client.setannotation('INBOX', '/comment', + '("value.shared" "My comment")') + self.assertEqual(typ, 'OK') + self.assertEqual(server.args, + ['INBOX', '/comment', '("value.shared" "My comment")']) + + def test_proxyauth(self): + client, server = self._setup(make_simple_handler('PROXYAUTH')) + client.login('user', 'pass') + typ, data = client.proxyauth('user') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [b'PROXYAUTH completed']) + self.assertEqual(server.args, ['user']) + + def test_xatom(self): + client, server = self._setup(make_simple_handler('MYCOMMAND', + completed='MYCOMMAND completed')) + client.login('user', 'pass') + self.addCleanup(imaplib.Commands.pop, 'MYCOMMAND', None) + typ, data = client.xatom('MYCOMMAND', 'arg1', 'arg2') + self.assertEqual(typ, 'OK') + self.assertEqual(data, [b'MYCOMMAND completed']) + self.assertEqual(server.args, ['arg1', 'arg2']) def test_control_characters(self): client, _ = self._setup(SimpleIMAPHandler)