Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

set_result on canceled future #34

Open
jcarmena opened this issue Jun 10, 2015 · 21 comments
Open

set_result on canceled future #34

jcarmena opened this issue Jun 10, 2015 · 21 comments
Assignees

Comments

@jcarmena
Copy link

The asyncio documentation says:

Don’t call set_result() or set_exception() method of Future if the future is cancelled:
it would fail with an exception. For example, write:

if not fut.cancelled():
    fut.set_result('done')

The future state is not checked and breaks (I don't know how the future gets cancelled, yet) :

error on dispatch
Traceback (most recent call last):
  File "/home/azureuser/.local/share/virtualenvs/pfc/lib/python3.4/site-packages/aioamqp/protocol.py", line 196, in run
    yield from self.dispatch_frame()
  File "/home/azureuser/.local/share/virtualenvs/pfc/lib/python3.4/site-packages/aioamqp/protocol.py", line 177, in dispatch_frame
    yield from self.channels[frame.channel].dispatch_frame(frame)
  File "/home/azureuser/.local/share/virtualenvs/pfc/lib/python3.4/site-packages/aioamqp/channel.py", line 91, in dispatch_frame
    yield from methods[(frame.class_id, frame.method_id)](frame)
  File "/usr/lib/python3.4/asyncio/tasks.py", line 84, in coro
    res = func(*args, **kw)
  File "/home/azureuser/.local/share/virtualenvs/pfc/lib/python3.4/site-packages/aioamqp/channel.py", line 120, in open_ok
    fut.set_result(True)
  File "/usr/lib/python3.4/asyncio/futures.py", line 298, in set_result
    raise InvalidStateError('{}: {!r}'.format(self._state, self))
asyncio.futures.InvalidStateError: CANCELLED: Future<CANCELLED>
@dzen
Copy link
Contributor

dzen commented Jun 10, 2015

Thank you for this bug report.

The problem is to know why the future was cancelled, but it shows that we need to check the futures more carefuly.

Can you add more details on how you get this error ? The conditions ?
Thank you.

@jcarmena
Copy link
Author

Sure. I'm using aioamqp inside a web app to spawn background tasks in another machine. I create a new channel in each request and then I publish to an exchange, like this:

channel = yield from protocol.channel()
yield from channel.publish("test", exchange_name='workers', routing_key='task1')

The issue comes when I do localhost ApacheBench tests with -n 10000 and -c 1000:

$ ab -n 10000 -c 1000 localhost:8080/
This is ApacheBench, Version 2.3 <$Revision: 1528965 $>
Copyright 1996 Adam Twiss, Zeus Technology Ltd, http://www.zeustech.net/
Licensed to The Apache Software Foundation, http://www.apache.org/

Benchmarking localhost (be patient)
Completed 1000 requests
Completed 2000 requests
Completed 3000 requests
Completed 4000 requests
Completed 5000 requests
Completed 6000 requests
Completed 7000 requests
Completed 8000 requests
Completed 9000 requests
apr_socket_recv: Connection reset by peer (104)
Total of 9873 requests completed

I think the -n and -c values will be different in each machine.
I have reduced the code to

  1. get request,
  2. publish to exchange,
    but still the same issue.

Regards

@dzen
Copy link
Contributor

dzen commented Jun 10, 2015

Is there any interesting log in your amqp broker ? what does it says ?

@jcarmena
Copy link
Author

It's RabbitMQ. Nothing interesting there, only open/close connections.

I have noticed that it does not happen every time (but apache bench always fails "apr_socket_recv: Connection reset by peer (104)"), so it seems that when the http server breaks the future is canceled and then occurs some kind of race condition.

Perhaps it's not your fault and you only have to check future's state for graceful behavior when program breaks.

@dzen dzen added the bug label Jun 11, 2015
@dzen dzen self-assigned this Jun 11, 2015
@dzen
Copy link
Contributor

dzen commented Jun 11, 2015

I'll try something similar tomorow afternoon. Which asyncio http server are you using ? aiohttp.web ?

@dzen dzen added need inspection and removed bug labels Jun 11, 2015
@jcarmena
Copy link
Author

Yes, it's aiohttp, with this minimal code fails too:

import asyncio
import aioamqp
import textwrap
from aiohttp.web import Application, Response, StreamResponse


import argparse
parser = argparse.ArgumentParser()
parser.add_argument("-p", "--port", help="Port")
args = parser.parse_args()
port = args.port or 8080


def index(request):
    channel = yield from protocol.channel()
    yield from channel.publish("test", exchange_name='workers', routing_key='work1')
    return Response(body=b'OK')


@asyncio.coroutine
def init(loop):
    global transport, protocol
    transport, protocol = yield from aioamqp.connect()

    app = Application(loop=loop)
    app.router.add_route('GET', '/', index)

    handler = app.make_handler()
    srv = yield from loop.create_server(handler, 'localhost', port)
    print("Server started at http://localhost:" + str(port))
    return srv, handler


transport, protocol = None, None
loop = asyncio.get_event_loop()
srv, handler = loop.run_until_complete(init(loop))
try:
    loop.run_forever()
except KeyboardInterrupt:
    loop.run_until_complete(handler.finish_connections())

Exchanges and queues are created previously and are durable.

@mwfrojdman
Copy link
Contributor

index() creates a new channel on every request, but doesn't close them ever. Does the problem reproduce with a yield from channel.close() before return Response()?

This doesn't sound like abusing the protocol as the server gladly creates the new channels, but might be related to the error emerging.

@dzen
Copy link
Contributor

dzen commented Jun 11, 2015

Hello,

I used your script on my machine, I set index to be a coroutine, and added a yield from channel.close() in this coroutine.

results:

$ ab -n 10000 -c 1000 localhost:8080/
This is ApacheBench, Version 2.3 <$Revision: 1604373 $>
Copyright 1996 Adam Twiss, Zeus Technology Ltd, http://www.zeustech.net/
Licensed to The Apache Software Foundation, http://www.apache.org/

Benchmarking localhost (be patient)
Completed 1000 requests
Completed 2000 requests
Completed 3000 requests
Completed 4000 requests
Completed 5000 requests
Completed 6000 requests
Completed 7000 requests
Completed 8000 requests
Completed 9000 requests
Completed 10000 requests
Finished 10000 requests

Server Software:
Server Hostname: localhost
Server Port: 8080

Document Path: /
Document Length: 2 bytes

Concurrency Level: 1000
Time taken for tests: 8.997 seconds
Complete requests: 10000
Failed requests: 0
Total transferred: 1310000 bytes
HTML transferred: 20000 bytes
Requests per second: 1111.43 #/sec
Time per request: 899.741 ms
Time per request: 0.900 [ms](mean, across all concurrent requests)
Transfer rate: 142.18 [Kbytes/sec] received

Connection Times (ms)
min mean[+/-sd] median max
Connect: 0 491 1083.9 0 7013
Processing: 18 213 590.8 106 6504
Waiting: 18 209 590.8 102 6503
Total: 18 703 1408.4 115 8982

Percentage of the requests served within a certain time (ms)
50% 115
66% 165
75% 1105
80% 1118
90% 1315
95% 3069
98% 7494
99% 8925
100% 8982 (longest request)

@jcarmena
Copy link
Author

Arg, I forgot channel.close() but still fails, try it with higher numbers

@dzen
Copy link
Contributor

dzen commented Jun 12, 2015

Hello again,

I ran 'ab' with :

 $ ab -n 1000000 -c 10000 localhost:8080/

apr_socket_recv: Connection reset by peer (104)
Total of 69595 requests completed

And got an error when decoding the frame:

Error handling request
Traceback (most recent call last):
  File "/home/benoit/test/venv34/lib/python3.4/site-packages/aiohttp/server.py", line 272, in start
    yield from self.handle_request(message, payload)
  File "/home/benoit/test/venv34/lib/python3.4/site-packages/aiohttp/web.py", line 85, in handle_request
    resp = yield from handler(request)
  File "ai.py", line 16, in index
    channel = yield from protocol.channel()
  File "/home/benoit/Projects/blue/aioamqp/aioamqp/protocol.py", line 306, in channel
    yield from channel.open()
  File "/home/benoit/Projects/blue/aioamqp/aioamqp/channel.py", line 127, in open
    yield from self._write_frame(frame, request, no_wait=False, timeout=timeout, no_check_open=True)
  File "/usr/lib/python3.4/asyncio/coroutines.py", line 141, in coro
    res = func(*args, **kw)
  File "/home/benoit/Projects/blue/aioamqp/aioamqp/channel.py", line 111, in _write_frame
    frame.write_frame(request)
  File "/home/benoit/Projects/blue/aioamqp/aioamqp/frame.py", line 385, in write_frame
    header = struct.pack('!BHI', self.frame_type, self.channel, payload.tell() + len(content_header))
struct.error: 'H' format requires 0 <= number <= 65535

I must check the doc to check the frame parsing. Can you test against the last master ?

@dzen
Copy link
Contributor

dzen commented Jun 12, 2015

aioamqp cannot reuses previous channel id for now.

I created issue #36

@jcarmena
Copy link
Author

Last master keeps failing. Remember that it does not shows the error every time, it does after two or three tests.

@dzen
Copy link
Contributor

dzen commented Jun 15, 2015

I'll retest this when the library would reuise the channel id

@ariddell
Copy link

I also encountered this. (Or at least I think I did.) I take it that fixing the problem is more complicated than just adding a check if fut.cancelled() in channel.py?
https://docs.python.org/3/library/asyncio-dev.html?highlight=cancelled#cancellation

@ariddell
Copy link

Here's a log where the problem occurs. It looks like self._get_waiter('close') in channel.py is canceled, so self._get_waiter('close').set_result(True) raises an exception.

Dec 28 08:51:02 etna docker[25369]: ERROR:aioamqp.protocol:error on dispatch
Dec 28 08:51:02 etna docker[25369]: Traceback (most recent call last):
Dec 28 08:51:02 etna docker[25369]: File "/env/lib/python3.5/site-packages/aioamqp/protocol.py", line 256, in run
Dec 28 08:51:02 etna docker[25369]: yield from self.dispatch_frame()
Dec 28 08:51:02 etna docker[25369]: File "/env/lib/python3.5/site-packages/aioamqp/protocol.py", line 211, in dispatch_frame
Dec 28 08:51:02 etna docker[25369]: yield from self.channels[frame.channel].dispatch_frame(frame)
Dec 28 08:51:02 etna docker[25369]: File "/env/lib/python3.5/site-packages/aioamqp/channel.py", line 110, in dispatch_frame
Dec 28 08:51:02 etna docker[25369]: yield from methods[(frame.class_id, frame.method_id)](frame)
Dec 28 08:51:02 etna docker[25369]: File "/usr/lib/python3.5/asyncio/coroutines.py", line 206, in coro
Dec 28 08:51:02 etna docker[25369]: res = func(*args, **kw)
Dec 28 08:51:02 etna docker[25369]: File "/env/lib/python3.5/site-packages/aioamqp/channel.py", line 165, in close_ok
Dec 28 08:51:02 etna docker[25369]: self._get_waiter('close').set_result(True)
Dec 28 08:51:02 etna docker[25369]: File "/usr/lib/python3.5/asyncio/futures.py", line 329, in set_result
Dec 28 08:51:02 etna docker[25369]: raise InvalidStateError('{}: {!r}'.format(self._state, self))
Dec 28 08:51:02 etna docker[25369]: asyncio.futures.InvalidStateError: CANCELLED: <Future cancelled>

I think this might be occurring in a case where there's two attempts to close the channel.

@dzen
Copy link
Contributor

dzen commented Dec 28, 2015

Hello @ariddell,

would you please paste some ? it seems you already closed the channel ?

@ariddell
Copy link

I'm not doing anything sophisticated, just a simple RPC setup; no multi-threading just asyncio. If I do have two coroutines that both close the connection/channel there shouldn't be an error, right?

I'll see if I can't figure out a way to reproduce the error.

@dzen
Copy link
Contributor

dzen commented Dec 28, 2015

I have a few days to have a look right now. I can push a branch with a fix, but I would like to know how you're using aioamqp and how you're triggering this bug.

Thank you

@ariddell
Copy link

I'm pretty sure I'm calling close on the connection and then close on a channel (associated with the connection). I know this is wrong but I think aioamqp might want to check on the future being cancelled.

In case you're looking for prior art, here is how aiohttp closes a websocket -- They have a _closed variable that tracks state. And they return False if the connection is already closed.

https://github.com/KeepSafe/aiohttp/blob/e09b86204c9099389c530b2886770e0060a05f63/aiohttp/web_ws.py#L174

    @asyncio.coroutine
    def close(self, *, code=1000, message=b''):
        if self._writer is None:
            raise RuntimeError('Call .prepare() first')

        if not self._closed:
            self._closed = True
            try:
                self._writer.close(code, message)
            except (asyncio.CancelledError, asyncio.TimeoutError):
                self._close_code = 1006
                raise
            except Exception as exc:
                self._close_code = 1006
                self._exception = exc
                return True

            if self._closing:
                return True

            while True:
                try:
                    msg = yield from asyncio.wait_for(
                        self._reader.read(),
                        timeout=self._timeout, loop=self._loop)
                except asyncio.CancelledError:
                    self._close_code = 1006
                    raise
                except Exception as exc:
                    self._close_code = 1006
                    self._exception = exc
                    return True

                if msg.tp == MsgType.close:
                    self._close_code = msg.data
                    return True
        else:
            return False

@dzen
Copy link
Contributor

dzen commented Dec 29, 2015

Hello @ariddell.

In aioamqp, the code is a little bit different: you get an exception in the code when receiving the confirmation from the server (https://www.rabbitmq.com/amqp-0-9-1-reference.html#connection.close-ok) but the whole channel is already mark'd as closed.

Could you please tell me how you're triggering this behaviour ? I'll dive into it and probably rework the way we're closing the channel.

Thank you.

@ariddell
Copy link

I don't know how the exception is happening. I think it's something in a finally clause so it's not affecting my application. I'll keep you posted. Thanks for your work on this!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants