From 1cfa72ce8890d5cd90745faa2890e69a8d7258bb Mon Sep 17 00:00:00 2001 From: SilentCtrl Date: Sat, 13 Oct 2018 17:20:48 -0700 Subject: [PATCH] Asynchronous Hibike (#633) * Use profiling and event loop libs * [HIBIKE] C extension (#622) * Add hibike_packet as submodule * Enable use of hibike_packet Detects whether the extension is installed by trying to import it. * Update hibike_packet * Remove process_buffer * [HIBIKE] fixing Disconnect and register_sensor syntax errors * Final preparation for asyncio merge (#631) * Do not crash on invalid COBS data * Rename hibike_process_async to hibike_process Remove old hibike_process and replace it with async version. API compatibility is maintained, so Runtime will not be changed. * Prevent hibike_tester hang after termination hibike_tester would hang after terminating hibike_process, because one of its threads was still running. We tell the thread to shut down after the process is done instead of running forever. * Remove stub file * Remove send_transport This function is identical to send, so it doesn't make sense to keep it around. * Replace virtual device with async version The asynchronous version uses less CPU and has saner defaults. * Remove virtual device dependency on hibike_process Some parts of the virtual device perform similar functions to parts of hibike_process, but it is better that the two implementations be allowed to evolve independently. * Update tests for async; add read/write tests Async tests need to deal with the event loop. In addition, we test that hibike is actually reading and writing to devices using virtual devices. * Remove outdated portions of README, update others * Add explanation for read/write retries * Add test for nonexistent device errors As it turns out, we were not sending error messages when a nonexistent device was accessed; a test should ensure this behavior stays in. * Update developer documentation * Fix lint errors * [RUNTIME] changed kill process timeout from one second to three seconds * Start async hibike rewrite * [HIBIKE] Full implementation of async smart sensor protocol (#523) Now, SmartSensorProtocol automagically registers itself with Hibike when it connects. * Fix bugs related to virtual devices Essentially, we exclude existing serial ports from our scan for new ones, but this didn't extend to virtual devices, leading them to be added multiple times. The other bug was that "connection_lost()" could get called before a device was identified, triggering a key error when we tried to take it out of the devices map. This is now checked for. Add function to create heartbeat requests * Add async virtual devices In addition, async virtual devices send heartbeat requests too, although they don't do anything with the responses. * Don't block event loop * Don't block event loop on state queue * Port process tests to async * Use aiofiles for nonblocking IO * Allow profiling measurements * Use an external cobs library * Memoize a few hot functions * [HIBIKE] Pause reading of hibike messages when max size exceeded * [HIBIKE] implemented backpressure on device side * Exclude new name of hibike_packet from linting * Unify runtime and hibike pipfiles * Bump required python version to 3.7 * Bump runtime version --- .gitignore | 3 + .gitmodules | 3 + hibike/DEVELOPERS.md | 10 +- hibike/Makefile | 2 +- hibike/Pipfile | 15 +- hibike/Pipfile.lock | 119 +++- hibike/README.md | 62 +- hibike/hibike_message.py | 69 +- hibike/hibike_packet_extension | 1 + hibike/hibike_process.py | 656 +++++++++----------- hibike/hibike_tester.py | 21 +- hibike/hibike_tests/hibike_process_tests.py | 180 +++++- hibike/hibike_tests/utils.py | 41 ++ hibike/lib/hibike/hibike_device.cpp | 20 + hibike/lib/hibike/hibike_message.cpp | 20 +- hibike/travis/Makefile | 2 +- hibike/virtual_device.py | 296 +++++---- hibike/virtual_device_defaults.json | 13 + makerelease | 2 +- runtime/Pipfile | 10 + runtime/Pipfile.lock | 107 +++- runtime/runtime.py | 2 +- runtime/runtimeUtil.py | 2 +- runtime/studentapi.py | 13 + runtime/test_async.py | 106 ++++ 25 files changed, 1127 insertions(+), 648 deletions(-) create mode 100644 .gitmodules create mode 160000 hibike/hibike_packet_extension mode change 100755 => 100644 hibike/hibike_process.py create mode 100644 hibike/virtual_device_defaults.json create mode 100644 runtime/test_async.py diff --git a/.gitignore b/.gitignore index 73acdd61..4a19cc18 100644 --- a/.gitignore +++ b/.gitignore @@ -396,3 +396,6 @@ hibike/virtual_devices.txt # Webstorm project settings .idea + +# Visual Studio Code config +.vscode diff --git a/.gitmodules b/.gitmodules new file mode 100644 index 00000000..5358e5fe --- /dev/null +++ b/.gitmodules @@ -0,0 +1,3 @@ +[submodule "hibike/hibike_packet"] + path = hibike/hibike_packet_extension + url = https://github.com/pioneers/hibike_packet.git diff --git a/hibike/DEVELOPERS.md b/hibike/DEVELOPERS.md index aa4cf500..189f47ff 100644 --- a/hibike/DEVELOPERS.md +++ b/hibike/DEVELOPERS.md @@ -29,20 +29,20 @@ Look at hibike/devices/ExampleDevice for an example implementation of a hibike d Device specific firmware must: - - Implement the arduino setup() and loop() functions, which must call hibike_setup() and hibike_loop() correspondingly - - Implement device_write() and device_data_update(), which are called when the BBB wants to either write to the device or get a data update from the device. + - Implement the arduino `setup()` and `loop()` functions, which must call `hibike_setup()` and `hibike_loop()` correspondingly + - Implement `device_write()` and `device_read()`, which are called when the BBB wants to write to or read from the device, respectively. - Both functions are given with a buffer containing data and a param index. Because params can have different data types and therefore values with different sizes, these functions must return the number of bytes they read from/wrote to their buffer. - Both functions are given the size of the buffer to avoid overflow. They must return 0 if their normal operation would overflow the buffer. - Conform to their device type definition described in hibikeDevices.json - Have the uid in their .h file filled out correctly - The uid references the device type id, which by convention is defined in an enum in hibike/lib/hibike/devices.h - - Call hibike_loop() at a higher frequency than both + - Call `hibike_loop()` at a higher frequency than both - The expected frequency of the BBB sending packets to the device - The expected subscription frequency (The BBB will subscribe to device data updates at some frequency) ### Using libraries in device firmware -For standard arduino libraries like or : +For standard arduino libraries like `` or ``: - go to hibike/Makefile and update the line that looks starts with `ARDUINO_LIBS :=` @@ -51,4 +51,4 @@ For external libraries: - add the library folder to hibike/lib - see the libraries already there - go to hibike/Makefile and update the line that looks starts with `SKETCH_LIBS =` - \ No newline at end of file + diff --git a/hibike/Makefile b/hibike/Makefile index 302f0249..40e4a1a1 100644 --- a/hibike/Makefile +++ b/hibike/Makefile @@ -55,7 +55,7 @@ AVRDUDE_CONF = /opt/arduino-1.8.1/hardware/tools/avr/etc/avrdude.conf CFLAGS_STD = -std=gnu11 ### CXXFLAGS_STD -CXXFLAGS_STD = -std=gnu++11 -fpermissive -fno-exceptions -ffunction-sections -fdata-sections -fno-threadsafe-statics -fno-devirtualize -fno-use-cxa-atexit +CXXFLAGS_STD = -std=gnu++11 -fpermissive -fno-exceptions -ffunction-sections -fdata-sections -fno-threadsafe-statics -fno-devirtualize -fno-use-cxa-atexit -fno-strict-aliasing ### CPPFLAGS ### Flags you might want to set for debugging purpose. Comment to stop. diff --git a/hibike/Pipfile b/hibike/Pipfile index 92583f6d..602fea13 100644 --- a/hibike/Pipfile +++ b/hibike/Pipfile @@ -1,20 +1,23 @@ [[source]] - url = "https://pypi.python.org/simple" verify_ssl = true [requires] - -python_version = "3.6" +python_version = "3.7" [packages] - +protobuf = "==3.2.0" pyserial = "==3.2.1" flask = "*" +aioprocessing = "*" +pyserial-asyncio = "*" +aiofiles = "*" +uvloop = "*" +cobs = "*" [dev-packages] - -pylint = "==1.7.2" +pylint = "==1.8.1" +yappi = "*" diff --git a/hibike/Pipfile.lock b/hibike/Pipfile.lock index 4312baab..b92357cd 100644 --- a/hibike/Pipfile.lock +++ b/hibike/Pipfile.lock @@ -1,24 +1,24 @@ { "_meta": { "hash": { - "sha256": "16aeb8cfbde1bb381c1893ad3d3ecfd39e8bf76cc09c6534d3e80238cda05c52" + "sha256": "f22def7bd4ef502618688e3e966ee763d3a413a4cbbf837f3cf18c360bf09436" }, "host-environment-markers": { "implementation_name": "cpython", - "implementation_version": "3.5.2", + "implementation_version": "3.6.6", "os_name": "posix", "platform_machine": "x86_64", "platform_python_implementation": "CPython", - "platform_release": "4.10.0-42-generic", + "platform_release": "4.15.0-36-generic", "platform_system": "Linux", - "platform_version": "#46~16.04.1-Ubuntu SMP Mon Dec 4 15:57:59 UTC 2017", - "python_full_version": "3.5.2", - "python_version": "3.5", + "platform_version": "#39~16.04.1-Ubuntu SMP Tue Sep 25 08:59:23 UTC 2018", + "python_full_version": "3.6.6", + "python_version": "3.6", "sys_platform": "linux" }, "pipfile-spec": 6, "requires": { - "python_version": "3.6" + "python_version": "3.5" }, "sources": [ { @@ -28,19 +28,39 @@ ] }, "default": { + "aiofiles": { + "hashes": [ + "sha256:1e644c2573f953664368de28d2aa4c89dfd64550429d0c27c4680ccd3aa4985d", + "sha256:021ea0ba314a86027c166ecc4b4c07f2d40fc0f4b3a950d1868a0f2571c2bbee" + ], + "version": "==0.4.0" + }, + "aioprocessing": { + "hashes": [ + "sha256:9b88f4e51b7358e7a53ea2b6cfd1e000848270a1b09532a410b6d38e015b7de5", + "sha256:b6952b476586c2a2e0d2802f42d73e6898ab474213ffda788d720a3fb57b01fb" + ], + "version": "==1.0.1" + }, "click": { "hashes": [ - "sha256:29f99fc6125fbc931b758dc053b3114e55c77a6e4c6c3a2674a2dc986016381d", - "sha256:f15516df478d5a56180fbf80e68f206010e6d160fc39fa508b65e035fd75130b" + "sha256:2335065e6395b9e67ca716de5f7526736bfa6ceead690adf616d925bdc622b13", + "sha256:5b94b49521f6456670fdb30cd82a4eca9412788a93fa6dd6df72c94d5a8ff2d7" + ], + "version": "==7.0" + }, + "cobs": { + "hashes": [ + "sha256:ae3319b5704d9269d1ca60787d2e5b79322bb9f440215ea086b986ed8d2a8405" ], - "version": "==6.7" + "version": "==1.1.3" }, "flask": { "hashes": [ - "sha256:0749df235e3ff61ac108f69ac178c9770caeaccad2509cb762ce1f65570a8856", - "sha256:49f44461237b69ecd901cc7ce66feea0319b9158743dd27a2899962ab214dac1" + "sha256:a080b744b7e345ccfcbc77954861cb05b3c63786e93f2b3875e0913d44b43f05", + "sha256:2271c0070dbcb5275fad4a82e29f23ab92682dc45f9dfbc22c02ba9b9322ce48" ], - "version": "==0.12.2" + "version": "==1.0.2" }, "itsdangerous": { "hashes": [ @@ -61,6 +81,19 @@ ], "version": "==1.0" }, + "protobuf": { + "hashes": [ + "sha256:3c1e93adccb6df731b003993e1f094221a18694cb767f9a31fa41cc8965c1f80", + "sha256:af61de46537670c0929c74961409935afbefd1a4b77fe17344e83a2854c79ebd", + "sha256:b1b7af2e12c8ec41b12791b6300a418573a0ee4c561e1b28fc9dc42dcc4c0ff5", + "sha256:8d742c2517e54eed287b3fe397140b386763a7b822357a413b3012372657f80b", + "sha256:58abae3c80f93881a803e8e3a669f8c0e437fa00096e3ef6e16b19543da6b836", + "sha256:74665b64cc66d224ea455c88c67a7734877c86b59de55fb21862975cf011181b", + "sha256:55030bccd91a54836b9c551b99234f5efbc9721c8b93c80569fbe94aca0b9c35", + "sha256:a48475035c42d13284fd7bf3a2ffa193f8c472ad1e8539c8444ea7e2d25823a1" + ], + "version": "==3.2.0" + }, "pyserial": { "hashes": [ "sha256:b05fa0d2f5cc5a9584bed5d695441f6dba127c5b593dfa6e671a0db2de0d959a", @@ -68,28 +101,58 @@ ], "version": "==3.2.1" }, + "pyserial-asyncio": { + "hashes": [ + "sha256:861d975029498e777a67c6d6d392e20b84a79c01608f3778c978d0106467c8f8", + "sha256:c40677a8874d8c24d4423a97498746de776f6dbcd0efbb8fa43dcf011a589aee" + ], + "version": "==0.4" + }, + "six": { + "hashes": [ + "sha256:832dc0e10feb1aa2c68dcc57dbb658f1c7e65b9b61af69048abc87a2db00a0eb", + "sha256:70e8a77beed4562e7f14fe23a786b54f6296e34344c23bc42f07b15018ff98e9" + ], + "version": "==1.11.0" + }, + "uvloop": { + "hashes": [ + "sha256:f2ffcaa13a5e279d0b3296cd6c691df39876cc818482168a80edd3b0e5deef57", + "sha256:089b3513db7f2122ac00a9ce18be879d626a566537c93bbcbb54053e3f24acf5", + "sha256:d3818242d174a326ea49e2e8f7c1e448432ce17ecb31aeb5084600950857b663", + "sha256:58d6978112ff292cedf2fd754c8085c9a8c6b98737b8ab3cda3d2a081977a91e", + "sha256:0657ebcccb261bdd0a360c83dbc6c1218f13cf5c1a3f381bca68ba5977bb6e5a", + "sha256:c2e04cab3e2c71d79002a814a243bc42f0253eb761b1f3af989d38ec8142532c", + "sha256:4076d40ae0b7557d5982ffe726b153c947a4d70725080ead0121006bdb9f7431", + "sha256:cbab9f6de63b10fc4991cbf9a720a8ceecfba501f5571f35fc3a74c76223ea66", + "sha256:251744b1bb162577db2e48cccb28ec9bad4126591471e6ba63dcbe71abc1c741", + "sha256:a97bd62ebbdf7e6e84bf44afe439d9b24ce4d8661a29a639626a8c03748f6f98" + ], + "version": "==0.11.2" + }, "werkzeug": { "hashes": [ - "sha256:f3000aa146ce8a9da8ca3e978e0e931c2a58eb56c323a5efb6b4307f7832b549", - "sha256:6246e5fc98a505824113fb6aca993d45ea284a2bcffdc2c65d0c538e53e4abd3" + "sha256:d5da73735293558eb1651ee2fddc4d0dedcfa06538b8813a2e20011583c9e49b", + "sha256:c3fd7a7d41976d9f44db327260e263132466836cef6f91512889ed60ad26557c" ], - "version": "==0.13" + "version": "==0.14.1" } }, "develop": { "astroid": { "hashes": [ - "sha256:badf6917ef7eb0ade0ea6eae347aed1e3f8f4c9375a02916f5cc450b3c8a64c0", - "sha256:71dadba2110008e2c03f9fde662ddd2053db3c0489d0e03c94e828a0399edd4f" + "sha256:0ef2bf9f07c3150929b25e8e61b5198c27b0dca195e156f0e4d5bdd89185ca1a", + "sha256:fc9b582dba0366e63540982c3944a9230cbc6f303641c51483fa547dcc22393a" ], - "version": "==1.6.0" + "version": "==1.6.5" }, "isort": { "hashes": [ - "sha256:cd5d3fc2c16006b567a17193edf4ed9830d9454cbeb5a42ac80b36ea00c23db4", - "sha256:79f46172d3a4e2e53e7016e663cc7a8b538bec525c36675fcfd2767df30b3983" + "sha256:ec9ef8f4a9bc6f71eec99e1806bfa2de401650d996c59330782b89a5555c1497", + "sha256:1153601da39a25b14ddc54955dbbacbb6b2d19135386699e2ad58517953b34af", + "sha256:b9c40e9750f3d77e6e4d441d8b0266cf555e7cdabdcff33c4fd06366ca761ef8" ], - "version": "==4.2.15" + "version": "==4.3.4" }, "lazy-object-proxy": { "hashes": [ @@ -134,10 +197,10 @@ }, "pylint": { "hashes": [ - "sha256:c7a3ee11db42d00334671b778f042793c837b73f5883132158284b7dbd6f8184", - "sha256:ea6afb93a9ed810cf52ff3838eb3a15e2bf6a81b80de0eaede1ce442caa5ca69" + "sha256:c8e59da0f2f9990eb00aad1c1de16cd7809315842ebccc3f65ca9df46213df3b", + "sha256:3035e44e37cd09919e9edad5573af01d7c6b9c52a0ebb4781185ae7ab690458b" ], - "version": "==1.7.2" + "version": "==1.8.1" }, "six": { "hashes": [ @@ -151,6 +214,12 @@ "sha256:d4d560d479f2c21e1b5443bbd15fe7ec4b37fe7e53d335d3b9b0a7b1226fe3c6" ], "version": "==1.10.11" + }, + "yappi": { + "hashes": [ + "sha256:5f657129e1b9b952379ffbc009357d0dcdb58c50f3bfe88ffbb992e4b27b263c" + ], + "version": "==0.98" } } } diff --git a/hibike/README.md b/hibike/README.md index 0a9084ba..e1f049ad 100644 --- a/hibike/README.md +++ b/hibike/README.md @@ -1,62 +1,7 @@ -# hibike 2.0! +# Hibike Hibike is a lightweight communications protocol designed for the passing of sensor data for the PiE Robotics Kit, a.k.a Frank or "Kit Minimum" to some. -#### This branch contains documentation and implementation for the 2016-2017 version of the hibike protocol, which should feature iterative improvements over last year's protocol - -#### These suggestions should make their way into protocol documentation first, then implemented in code. The basic read/write functions need to be implemented in Python, and everything needs to be implemented in Arduino, before merging this branch back to develop. - -## Suggested Protocol Changes (Please make suggestions) - -1. The checksum should be changed to something more robust - - Janet suggested using UDP's checksum -2. COBS encoding should be implemented cleaner - - It should be documented and part of the protocol itself, not a wrapper around it - - There should not be a redundant length field - - Only the first 2 bytes of a packet should not be cobs encoded - - 0 byte and packet length -3. Data Update and Device Update/Status should be unified - - Huge advantage: The BBB polling, writing, and subscribing can have identical responses from SD, and runtime can treat them the same - - Protocol can abstract a device as only key value pairs - - Current implementaion has key value pairs and one custom "data update" struct per device - - Custom "data update struct" is nice because it is the exact size we need - - Only Key Value pairs means 32 bits per digital IO pin... - - Does ease of abstraction and implementaion justify larger packet length? - - Is packet length significant anyways? - - 32 bits at 115200 baud is .2 milliseconds? - - Someone should test actual throughput - - Especially how fast BBB can actually receive data - - Even when doing blocking reads byte by byte in python? - - While runtime and student code are running? - - With 20+ devices? - - Should the size of values be unique to reduce packet length? - - Harder to implement - - Both devices need to know the size of each value - - But they needed to know the types anyways so maybe this is ok - - SubRequest can specify which keys it wants to receive - - Each DataUpdate will have to also encode this information to be stateless - - What if we subscribe to more than the max payload size? - - Just respond with as many as you can fit? - - Error Packet? - - A uint16_t bitmask could work? 16 keys is plenty for a device -4. Unused packets should be removed from the protocol - - DescriptionRequest and Response are redundant - - maintaing one config file is better for production - - Do we have a use for the error packet yet? - - Maybe when SD receives write requests for a read only key? - - Maybe when a checksum fails/unexpected 0 byte is seen? - - Can there be infinite loops of back and forth error packets? - - Maybe only SD can send errors, and they'll only be used for logging statistics -5. Hot-Plugging behaviour should be optimized and well-defined - - Current status quo (rebooting BBB to add devices) is unacceptable - - Reenmerate devices every x seconds and also when runtime requests it - - Or runtime can just request it every x seconds - - Hibike can notify runtime when a device disconnects/connects - - Student code accessing disconnected devices should raise a well-defined exception in *student code* - - If a SD disconnects, will BBB find out until it tries reenumerating it? - - If so, should BBB even bother reenumerating SDs it hasn't detected as disconnected? - - ## Section 0: A Quick Introduction @@ -233,9 +178,6 @@ Device Type Enumeration: Note: These assignments are totally random as of now. We need to figure out exactly what devices we are supporting. -Note: As of now, Grizzlies are not supported by Hibike (pyGrizzly should - be used instead) But they should be in the near future, to preserve - the idea of treating every peripheral as a SmartDevice. Error ID Enumeration: @@ -356,7 +298,7 @@ Note: These assignments are also fairly random and may not all even be 8. Heart Beat Response: Sent in response to a Heart Beat Request - This message pathway is a two way street, both BBB and SD can receive requests and send responses to the other - Should only be sent upon receiving a Heart Beat Request - - Payload is currently unused, but can be used for future functionality in keeping track of individual heartbeat requests and responses (for latency purposes) + - The payload is used for flow control; 0 indicates that packets should be sent at full speed, and 100 indicates as slow as possible. Payload format: diff --git a/hibike/hibike_message.py b/hibike/hibike_message.py index d7424603..9ea3d255 100755 --- a/hibike/hibike_message.py +++ b/hibike/hibike_message.py @@ -7,6 +7,9 @@ import os import json import threading +from functools import lru_cache + +from cobs import cobs CONFIG_FILE = open(os.path.join( os.path.dirname(__file__), 'hibikeDevices.json'), 'r') @@ -141,16 +144,18 @@ def checksum(data): return chk -def send(serial_conn, message): +def send(connection, message): """ - Send MESSAGE over SERIAL_CONN. + Send ``message`` over ``connection``. + + This function accepts regular serial ports or asynchronous transports. """ m_buff = message.to_bytes() chk = checksum(m_buff) m_buff.append(chk) encoded = cobs_encode(m_buff) out_buf = bytearray([0x00, len(encoded)]) + encoded - serial_conn.write(out_buf) + connection.write(out_buf) def encode_params(device_id, params): @@ -171,6 +176,7 @@ def encode_params(device_id, params): return mask +@lru_cache(maxsize=128) def decode_params(device_id, params_bitmask): """ Decode PARAMS_BITMASK. @@ -193,7 +199,8 @@ def decode_params(device_id, params_bitmask): return named_params -def format_string(device_id, params): +@lru_cache(maxsize=128) +def format_string_cached(device_id, params): """ A string representation of the types of PARAMS. """ @@ -205,6 +212,14 @@ def format_string(device_id, params): return type_string +def format_string(device_id, params): + """ + Shim for ``format_string``, to ensure all argumets + have hashable types. + """ + return format_string_cached(device_id, tuple(params)) + + def make_ping(): """ Makes and returns Ping message.""" payload = bytearray() @@ -219,6 +234,12 @@ def make_disable(): return message +def make_heartbeat_request(heartbeat_id=0): + """Return a heartbeat request.""" + payload = bytearray(struct.pack(" len(data): - return bytearray() - output.extend(data[index:index + block_size]) - index += block_size - if block_size + 1 < 255 and index < len(data): - output.append(0) - return output + try: + return bytearray(cobs.decode(data)) + except cobs.DecodeError: + return bytearray() class HibikeMessageException(Exception): diff --git a/hibike/hibike_packet_extension b/hibike/hibike_packet_extension new file mode 160000 index 00000000..ba0e7354 --- /dev/null +++ b/hibike/hibike_packet_extension @@ -0,0 +1 @@ +Subproject commit ba0e7354e2d0c7e1dc584c63fdf7f4b88f1be8c2 diff --git a/hibike/hibike_process.py b/hibike/hibike_process.py old mode 100755 new mode 100644 index ed450efd..cc8901b5 --- a/hibike/hibike_process.py +++ b/hibike/hibike_process.py @@ -1,25 +1,30 @@ """ The main Hibike process. """ -from collections import namedtuple +import asyncio import glob -import multiprocessing import os -import queue +import sys import random -import threading import time -import sys -#from PieCentral.runtime.runtimeUtil import * -# from runtimeUtil import * -# pylint: disable=import-error +import serial_asyncio +import aioprocessing +import aiofiles +import uvloop + + import hibike_message as hm -import serial +try: + import hibike_packet + USING_PACKET_EXTENSION = True +except ImportError: + USING_PACKET_EXTENSION = False __all__ = ["hibike_process"] +asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) # .04 milliseconds sleep is the same frequency we subscribe to devices at BATCH_SLEEP_TIME = .04 @@ -28,174 +33,243 @@ # Time in seconds to wait between checking for new devices # and cleaning up old ones. HOTPLUG_POLL_INTERVAL = 1 +# Whether to use profiling or not. On the BBB, profiling adds a significant overhead (~30%). +USE_PROFILING = False +# Where to output profiling statistics. By default, this is in Callgrind format +PROFILING_OUTPUT_FILE = "func_stats" +# The time period to take measurements over, in seconds +PROFILING_PERIOD = 60 +PAUSE_QUEUE_SIZE = 10 +RESUME_QUEUE_SIZE = 2 + +def scan_for_serial_ports(): + """ + Scan for serial ports that look like an Arduino. + """ + # Last command is included so that it's compatible with OS X Sierra + # Note: If you are running OS X Sierra, do not access the directory through vagrant ssh + # Instead access it through Volumes/vagrant/PieCentral + return set(glob.glob("/dev/ttyACM*") +\ + glob.glob("/dev/ttyUSB*") +\ + glob.glob("/dev/tty.usbmodem*")) -def get_working_serial_ports(excludes=()): +async def get_working_serial_ports(event_loop, excludes=()): """ - Scan for open COM ports, except those in EXCLUDES. + Scan for open COM ports, except those in `excludes`. Returns: - A list of serial port objects (`serial.Serial`) and port names. + A list of port names. """ excludes = set(excludes) - # Last command is included so that it's compatible with OS X Sierra - # Note: If you are running OS X Sierra, do not access the directory through vagrant ssh - # Instead access it through Volumes/vagrant/PieCentral - ports = set(glob.glob("/dev/ttyACM*") + glob.glob("/dev/ttyUSB*") - + glob.glob("/dev/tty.usbmodem*")) - ports.difference_update(excludes) + ports = await event_loop.run_in_executor(None, scan_for_serial_ports) try: virtual_device_config_file = os.path.join(os.path.dirname(__file__), "virtual_devices.txt") - ports.update(open(virtual_device_config_file, "r").read().split()) + async with aiofiles.open(virtual_device_config_file, loop=event_loop) as f: + contents = await f.read() + ports.update(contents.split()) except IOError: pass - - serials = [] - port_names = [] - for port in ports: - try: - serials.append(serial.Serial(port, 115200)) - port_names.append(port) - except serial.serialutil.SerialException: - print("Cannot Open Serial Port: " + str(port)) - return serials, port_names + ports.difference_update(excludes) + return list(ports) -def identify_smart_sensors(serial_conns): +async def hotplug_async(devices, batched_data, error_queue, state_queue, event_loop): """ - Given a list of serial port connections, figure out which - contain smart sensors. - - Returns: - A map of serial port names to UIDs. + Scan for new devices on serial ports and automatically spin them up. """ - def recv_subscription_response(conn, uid_queue, stop_event): + pending = set() + def protocol_factory(): """ - Place received subscription response UIDs from CONN into UID_QUEUE, - stopping when STOP_EVENT is set. + Create a `SmartSensorProtocol` with necessary parameters filled in. """ - try: - for packet in hm.blocking_read_generator(conn, stop_event): - msg_type = packet.get_message_id() - if msg_type == hm.MESSAGE_TYPES["SubscriptionResponse"]: - _, _, uid = hm.parse_subscription_response(packet) - uid_queue.put(uid) - except serial.SerialException: - pass - - - device_map = {} - candidates = [] - for conn in serial_conns: - old_timeout = conn.write_timeout - conn.write_timeout = IDENTIFY_TIMEOUT - try: - hm.send(conn, hm.make_ping()) - except serial.SerialTimeoutException: - continue - finally: - conn.write_timeout = old_timeout - maybe_device = namedtuple("MaybeDevice", ["serial_conn", "queue", "event", "thread"]) - maybe_device.queue = queue.Queue() - maybe_device.event = threading.Event() - maybe_device.serial_conn = conn - maybe_device.thread = threading.Thread(target=recv_subscription_response, - args=(conn, maybe_device.queue, maybe_device.event)) - candidates.append(maybe_device) - for cand in candidates: - cand.thread.start() - for cand in candidates: - try: - uid = cand.queue.get(block=True, timeout=IDENTIFY_TIMEOUT) - device_map[cand.serial_conn.name] = uid - # Shut device up - hm.send(cand.serial_conn, hm.make_subscription_request(uid, [], 0)) - except queue.Empty: - pass - for cand in candidates: - cand.event.set() - cand.thread.join() - return device_map - - -def spin_up_device(serial_port, uid, state_queue, batched_data, error_queue): - """ - Spin up a device with a given UID on SERIAL_PORT. + return SmartSensorProtocol(devices, batched_data, error_queue, + state_queue, event_loop, pending) - Returns: - The new device. - """ - pack = namedtuple("Threadpack", ["read_thread", "write_thread", - "write_queue", "serial_port", "instance_id"]) - pack.write_queue = queue.Queue() - pack.serial_port = serial_port - pack.write_thread = threading.Thread(target=device_write_thread, - args=(serial_port, pack.write_queue)) - pack.read_thread = threading.Thread(target=device_read_thread, - args=(uid, pack, error_queue, - state_queue, batched_data)) - # This is an ID that does not persist across disconnects, - # so that we can tell when a device has been reconnected. - pack.instance_id = random.getrandbits(128) - pack.write_thread.start() - pack.read_thread.start() - return pack - - -def hotplug(devices, state_queue, batched_data, error_queue): - """ - Remove disconnected devices and scan for new ones. - """ - clean_up_queue = queue.Queue() - clean_up_thread = threading.Thread(target=clean_up_devices, args=(clean_up_queue, )) - clean_up_thread.start() while True: - time.sleep(HOTPLUG_POLL_INTERVAL) - scan_for_new_devices(devices, state_queue, batched_data, error_queue) - remove_disconnected_devices(error_queue, devices, clean_up_queue, state_queue) - - -def scan_for_new_devices(existing_devices, state_queue, batched_data, error_queue): - """ - Find devices that are on serial ports not in EXISTING_DEVICES, and add - any that have been found to it. + await asyncio.sleep(HOTPLUG_POLL_INTERVAL, loop=event_loop) + port_names = set([dev.transport.serial.name for dev in devices.values()\ + if dev.transport is not None and dev.transport.serial is not None]) + port_names.update(pending) + new_serials = await get_working_serial_ports(event_loop, port_names) + for port in new_serials: + try: + pending.add(port) + await serial_asyncio.create_serial_connection(event_loop, protocol_factory, port, + baudrate=115200) + except serial_asyncio.serial.SerialException: + pass + await remove_disconnected_devices(error_queue, devices, state_queue, event_loop) + + +class SmartSensorProtocol(asyncio.Protocol): """ - ports, names = get_working_serial_ports(map(lambda d: d.serial_port.name, - existing_devices.values())) - sensors = identify_smart_sensors(ports) - for (ser, uid) in sensors.items(): - idx = names.index(ser) - port = ports[idx] - pack = spin_up_device(port, uid, state_queue, batched_data, error_queue) - existing_devices[uid] = pack - # Tell the device to start sending data - pack.write_queue.put(("ping", [])) - pack.write_queue.put(("subscribe", [1, 0, []])) - - -def clean_up_devices(device_queue): + Handle communication over serial with a smart sensor. + + :param dict devices: Mapping from UIDs to devices + :param dict batched_data: Mapping from UIDs to device state + :param asyncio.Queue error_queue: Info about disconnects goes here + :param aioprocessing.Queue state_queue: Connection to StateManager + :param event_loop: The event loop + :param set pending: Set of serial connections that may or may not + have devices on them. """ - Clean up associated resources of devices in the queue. + PACKET_BOUNDARY = bytes([0]) + __slots__ = ("uid", "write_queue", "batched_data", "read_queue", "error_queue", + "state_queue", "instance_id", "transport", "_ready", "serial_buf") + # pylint: disable=too-many-arguments + def __init__(self, devices, batched_data, error_queue, state_queue, event_loop, pending: set): + # We haven't found out what our UID is yet + self.uid = None + + self.write_queue = asyncio.Queue(loop=event_loop) + self.batched_data = batched_data + self.read_queue = asyncio.Queue(loop=event_loop) + self.error_queue = error_queue + self.state_queue = state_queue + self.instance_id = random.getrandbits(128) + + self.transport = None + self._ready = asyncio.Event(loop=event_loop) + if USING_PACKET_EXTENSION: + # pylint: disable=no-member + self.serial_buf = hibike_packet.RingBuffer() + else: + self.serial_buf = bytearray() + + event_loop.create_task(self.register_sensor(event_loop, devices, pending)) + event_loop.create_task(self.send_messages()) + event_loop.create_task(self.recv_messages()) + + async def register_sensor(self, event_loop, devices, pending): + """ + Try to get our UID from the sensor and register it with `hibike_process`. + """ + await self._ready.wait() + hm.send(self.transport, hm.make_ping()) + await asyncio.sleep(IDENTIFY_TIMEOUT, loop=event_loop) + if self.uid is None: + self.quit() + else: + hm.send(self.transport, hm.make_ping()) + hm.send(self.transport, + hm.make_subscription_request(hm.uid_to_device_id(self.uid), [], 0)) + devices[self.uid] = self + pending.remove(self.transport.serial.name) + + async def send_messages(self): + """ + Send messages in the queue to the sensor. + """ + await self._ready.wait() + while not self.transport.is_closing(): + instruction, args = await self.write_queue.get() + if instruction == "ping": + hm.send(self.transport, hm.make_ping()) + elif instruction == "subscribe": + uid, delay, params = args + hm.send(self.transport, + hm.make_subscription_request(hm.uid_to_device_id(uid), + params, delay)) + elif instruction == "read": + uid, params = args + hm.send(self.transport, hm.make_device_read(hm.uid_to_device_id(uid), params)) + elif instruction == "write": + uid, params_and_values = args + hm.send(self.transport, hm.make_device_write(hm.uid_to_device_id(uid), + params_and_values)) + elif instruction == "disable": + hm.send(self.transport, hm.make_disable()) + elif instruction == "heartResp": + uid = args[0] + hm.send(self.transport, hm.make_heartbeat_response(self.read_queue.qsize())) - Closing a serial port can take a very long time (30 seconds or more). - It's best to spin this function off into its own thread, - so that you're not blocked on reclaiming resources. + async def recv_messages(self): + """ + Process received messages. + """ + await self._ready.wait() + while not self.transport.is_closing(): + if self.read_queue.qsize() >= PAUSE_QUEUE_SIZE: + self.transport.pause_reading() + if self.read_queue.qsize() <= RESUME_QUEUE_SIZE: + self.transport.resume_reading() + packet = await self.read_queue.get() + message_type = packet.get_message_id() + if message_type == hm.MESSAGE_TYPES["SubscriptionResponse"]: + params, delay, uid = hm.parse_subscription_response(packet) + self.uid = uid + await self.state_queue.coro_put(("device_subscribed", [uid, delay, params])) + elif message_type == hm.MESSAGE_TYPES["DeviceData"]: + # This is kind of a hack, but it allows us to use `recv_messages` for + # detecting new smart sensors as well as reading from known ones. + if self.uid is not None: + params_and_values = hm.parse_device_data(packet, hm.uid_to_device_id(self.uid)) + self.batched_data[uid] = params_and_values + elif message_type == hm.MESSAGE_TYPES["HeartBeatRequest"]: + if self.uid is not None: + self.write_queue.put_nowait(("heartResp", [self.uid])) + + def connection_made(self, transport): + self.transport = transport + self._ready.set() + + def quit(self): + """ + Stop processing packets and close the serial connection. + """ + self.transport.abort() + + if USING_PACKET_EXTENSION: + def data_received(self, data): + self.serial_buf.extend(data) + # pylint: disable=no-member + maybe_packet = hibike_packet.process_buffer(self.serial_buf) + if maybe_packet is not None: + message_id, payload = maybe_packet + message = hm.HibikeMessage(message_id, payload) + self.read_queue.put_nowait(message) + else: + def data_received(self, data): + self.serial_buf.extend(data) + zero_loc = self.serial_buf.find(self.PACKET_BOUNDARY) + if zero_loc != -1: + self.serial_buf = self.serial_buf[zero_loc:] + packet = hm.parse_bytes(self.serial_buf) + if packet != None: + # Chop off a byte so we don't output this packet again + self.serial_buf = self.serial_buf[1:] + self.read_queue.put_nowait(packet) + elif self.serial_buf.count(self.PACKET_BOUNDARY) > 1: + # If there's another packet in the buffer + # we can safely jump to it for the next iteration + new_packet = self.serial_buf[1:].find(self.PACKET_BOUNDARY) + 1 + self.serial_buf = self.serial_buf[new_packet:] + + def connection_lost(self, exc): + if self.uid is not None: + error = Disconnect(uid=self.uid, instance_id=self.instance_id, accessed=False) + self.error_queue.put_nowait(error) + + +class Disconnect: """ - while True: - device = device_queue.get() - device.serial_port.close() - device.read_thread.join() - device.write_thread.join() + Information about a device disconnect. + """ + def __init__(self, uid, instance_id, accessed): + self.uid = uid + self.instance_id = instance_id + self.accessed = accessed -def remove_disconnected_devices(error_queue, devices, clean_up_queue, state_queue): +async def remove_disconnected_devices(error_queue, devices, state_queue, event_loop): """ - Clean up any disconnected devices in ERROR_QUEUE. + Clean up any disconnected devices in `error_queue`. """ next_time_errors = [] while True: try: - error = error_queue.get(block=False) + error = error_queue.get_nowait() pack = devices[error.uid] if not error.accessed: # Wait until the next cycle to make sure it's disconnected @@ -206,250 +280,126 @@ def remove_disconnected_devices(error_queue, devices, clean_up_queue, state_queu # The device has reconnected in the meantime continue uid = error.uid - pack = devices[uid] del devices[uid] - clean_up_queue.put(pack) - state_queue.put(("device_disconnected", [uid])) - except queue.Empty: + await state_queue.coro_put(("device_disconnected", [uid]), loop=event_loop) + except asyncio.QueueEmpty: for err in next_time_errors: - error_queue.put(err) + error_queue.put_nowait(err) return -# pylint: disable=too-many-branches, too-many-locals -# pylint: disable=too-many-arguments, unused-argument + +async def batch_data(sensor_values, state_queue, event_loop): + """ + Periodically send sensor values to `StateManager`. + """ + while True: + await asyncio.sleep(BATCH_SLEEP_TIME, loop=event_loop) + await state_queue.coro_put(("device_values", [sensor_values]), loop=event_loop) + + +async def print_profiler_stats(event_loop, time_delay): + """ + Print profiler statistics after a number of seconds. + """ + try: + import yappi + except ImportError: + return + await asyncio.sleep(time_delay, loop=event_loop) + print("Printing profiler stats") + yappi.get_func_stats().print_all(out=sys.stdout, + columns={0: ("name", 60), 1: ("ncall", 5), 2: ("tsub", 8), + 3: ("ttot", 8), 4: ("tavg", 8)}) + yappi.get_func_stats().save(PROFILING_OUTPUT_FILE, type="callgrind") + yappi.get_thread_stats().print_all() + + +class QueueContext: + """ + Stub to force aioprocessing to use an existing queue. + """ + def __init__(self, queue): + self._queue = queue + + # pylint: disable=invalid-name + def Queue(self, _size): + return self._queue + + def hibike_process(bad_things_queue, state_queue, pipe_from_child): """ Run the main hibike processs. """ - serials, serial_names = get_working_serial_ports() - smart_sensors = identify_smart_sensors(serials) - devices = {} + pipe_from_child = aioprocessing.AioConnection(pipe_from_child) + # By default, AioQueue instantiates a new Queue object, but we + # don't want that. + state_queue = aioprocessing.AioQueue(context=QueueContext(state_queue)) + bad_things_queue = aioprocessing.AioQueue(context=QueueContext(bad_things_queue)) + devices = {} batched_data = {} - error_queue = queue.Queue() - - for (ser, uid) in smart_sensors.items(): - index = serial_names.index(ser) - serial_port = serials[index] - pack = spin_up_device(serial_port, uid, state_queue, batched_data, error_queue) - devices[uid] = pack - - batch_thread = threading.Thread(target=batch_data, args=(batched_data, state_queue)) - batch_thread.start() - hotplug_thread = threading.Thread(target=hotplug, - args=(devices, state_queue, batched_data, error_queue)) - hotplug_thread.start() + event_loop = asyncio.get_event_loop() + error_queue = asyncio.Queue(loop=event_loop) + + event_loop.create_task(batch_data(batched_data, state_queue, event_loop)) + event_loop.create_task(hotplug_async(devices, batched_data, error_queue, + state_queue, event_loop)) + event_loop.create_task(dispatch_instructions(devices, bad_things_queue, state_queue, + pipe_from_child, event_loop)) + # start event loop + if USE_PROFILING: + try: + import yappi + yappi.start() + event_loop.create_task(print_profiler_stats(event_loop, PROFILING_PERIOD)) + except ImportError: + print("Unable to import profiler. Make sure you installed with the '--dev' flag.") - # Pings all devices and tells them to stop sending data - for pack in devices.values(): - pack.write_queue.put(("ping", [])) - pack.write_queue.put(("subscribe", [1, 0, []])) + event_loop.run_forever() - # the main thread reads instructions from statemanager and - # forwards them to the appropriate device write threads +async def dispatch_instructions(devices, bad_things_queue, state_queue, + pipe_from_child, event_loop): + """ + Respond to instructions from `StateManager`. + """ path = os.path.dirname(os.path.abspath(__file__)) parent_path = path.rstrip("hibike") runtime = os.path.join(parent_path, "runtime") sys.path.insert(1, runtime) + # Pylint doesn't understand our import shenanigans + # pylint: disable=import-error import runtimeUtil while True: - instruction, args = pipe_from_child.recv() + instruction, args = await pipe_from_child.coro_recv(loop=event_loop) try: if instruction == "enumerate_all": for pack in devices.values(): - pack.write_queue.put(("ping", [])) + pack.write_queue.put_nowait(("ping", [])) elif instruction == "subscribe_device": uid = args[0] - if uid in devices: - devices[uid].write_queue.put(("subscribe", args)) + devices[uid].write_queue.put_nowait(("subscribe", args)) elif instruction == "write_params": uid = args[0] - if uid in devices: - devices[uid].write_queue.put(("write", args)) + devices[uid].write_queue.put_nowait(("write", args)) elif instruction == "read_params": uid = args[0] - if uid in devices: - devices[uid].write_queue.put(("read", args)) + devices[uid].write_queue.put_nowait(("read", args)) elif instruction == "disable_all": for pack in devices.values(): - pack.write_queue.put(("disable", [])) + pack.write_queue.put_nowait(("disable", [])) elif instruction == "timestamp_down": - timestamp = time.perf_counter() + timestamp = time.time() args.append(timestamp) - state_queue.put(("timestamp_up", args)) + await state_queue.coro_put(("timestamp_up", args), loop=event_loop) except KeyError as e: - bad_things_queue.put(runtimeUtil.BadThing( + await bad_things_queue.coro_put(runtimeUtil.BadThing( sys.exc_info(), str(e), event=runtimeUtil.BAD_EVENTS.HIBIKE_NONEXISTENT_DEVICE)) except TypeError as e: - bad_things_queue.put(runtimeUtil.BadThing( + await bad_things_queue.coro_put(runtimeUtil.BadThing( sys.exc_info(), str(e), event=runtimeUtil.BAD_EVENTS.HIBIKE_INSTRUCTION_ERROR)) - - -def device_write_thread(ser, instr_queue): - """ - Send packets to SER based on instructions from INSTR_QUEUE. - """ - try: - while True: - instruction, args = instr_queue.get() - - if instruction == "ping": - hm.send(ser, hm.make_ping()) - elif instruction == "subscribe": - uid, delay, params = args - hm.send(ser, hm.make_subscription_request(hm.uid_to_device_id(uid), params, delay)) - elif instruction == "read": - uid, params = args - hm.send(ser, hm.make_device_read(hm.uid_to_device_id(uid), params)) - elif instruction == "write": - uid, params_and_values = args - hm.send(ser, hm.make_device_write(hm.uid_to_device_id(uid), params_and_values)) - elif instruction == "disable": - hm.send(ser, hm.make_disable()) - elif instruction == "heartResp": - uid = args[0] - hm.send(ser, hm.make_heartbeat_response()) - except serial.SerialException: - # Device has disconnected - pass - - -def device_read_thread(uid, pack, error_queue, state_queue, batched_data): - """ - Read packets from SER and update queues and BATCHED_DATA accordingly. - """ - ser = pack.serial_port - instruction_queue = pack.write_queue - try: - while True: - for packet in hm.blocking_read_generator(ser): - message_type = packet.get_message_id() - if message_type == hm.MESSAGE_TYPES["SubscriptionResponse"]: - params, delay, uid = hm.parse_subscription_response(packet) - state_queue.put(("device_subscribed", [uid, delay, params])) - elif message_type == hm.MESSAGE_TYPES["DeviceData"]: - params_and_values = hm.parse_device_data(packet, hm.uid_to_device_id(uid)) - batched_data[uid] = params_and_values - elif message_type == hm.MESSAGE_TYPES["HeartBeatRequest"]: - instruction_queue.put(("heartResp", [uid])) - except serial.SerialException: - error = namedtuple("Disconnect", ["uid", "instance_id", "accessed"]) - error.uid = uid - error.instance_id = pack.instance_id - error.accessed = False - error_queue.put(error) - -def batch_data(data, state_queue): - """ - Write out DATA to STATE_QUEUE periodically. - """ - while True: - time.sleep(BATCH_SLEEP_TIME) - state_queue.put(("device_values", [data])) - -############# -## TESTING ## -############# -# pylint: disable=invalid-name -if __name__ == "__main__": - # helper functions so we can spawn threads that try to read/write to hibike_devices periodically - def set_interval_sequence(functions, sec): - """ - Create a thread that executes FUNCTIONS after SEC seconds. - """ - def func_wrapper(): - """ - Execute the next function in FUNCTIONS after SEC seconds. - - Cycles through all functions. - """ - set_interval_sequence(functions[1:] + functions[:1], sec) - functions[0]() - t = threading.Timer(sec, func_wrapper) - t.start() - return t - - def make_send_write(pipe_to_child, uid, params_and_values): - """ - Create a function that sends UID and PARAMS_AND_VALUES - to PIPE_TO_CHILD. - """ - def helper(): - """ - Helper function. - """ - pipe_to_child.send(["write_params", [uid, params_and_values]]) - return helper - - to_child, from_child = multiprocessing.Pipe() - main_error_queue = multiprocessing.Queue() - main_state_queue = multiprocessing.Queue() - newProcess = multiprocessing.Process(target=hibike_process, - name="hibike_sim", - args=[main_error_queue, main_state_queue, from_child]) - newProcess.daemon = True - newProcess.start() - to_child.send(["enumerate_all", []]) - uids = set() - while True: - print("waiting for command") - command, main_args = main_state_queue.get() - if command == "device_subscribed": - dev_uid = main_args[0] - if dev_uid not in uids: - uids.add(dev_uid) - if hm.DEVICES[hm.uid_to_device_id(dev_uid)]["name"] == "YogiBear": - set_interval_sequence([ - make_send_write(to_child, dev_uid, [("duty_cycle", 0)]), - make_send_write(to_child, dev_uid, [("duty_cycle", 0.5)]), - make_send_write(to_child, dev_uid, [("duty_cycle", 1.0)]), - make_send_write(to_child, dev_uid, [("duty_cycle", 0)]), - make_send_write(to_child, dev_uid, [("duty_cycle", -0.5)]), - make_send_write(to_child, dev_uid, [("duty_cycle", -1.0)]), - make_send_write(to_child, dev_uid, [("duty_cycle", 0)]) - ], 0.75) - elif hm.DEVICES[hm.uid_to_device_id(dev_uid)]["name"] == "ServoControl": - set_interval_sequence([ - make_send_write(to_child, dev_uid, - [("servo0", 1), ("enable0", False), - ("servo1", 21), ("enable1", True), - ("servo2", 30), ("enable2", True), - ("servo3", 8), ("enable3", True)]), - make_send_write(to_child, dev_uid, - [("servo0", 5), ("enable0", False), - ("servo1", 5), ("enable1", True), - ("servo2", 5), ("enable2", True), - ("servo3", 5), ("enable3", False)]), - make_send_write(to_child, dev_uid, - [("servo0", 1), ("enable0", True), - ("servo1", 26), ("enable1", True), - ("servo2", 30), ("enable2", False), - ("servo3", 17), ("enable3", True)]), - make_send_write(to_child, dev_uid, - [("servo0", 13), ("enable0", False), - ("servo1", 7), ("enable1", False), - ("servo2", 24), ("enable2", True), - ("servo3", 10), ("enable3", True)]), - make_send_write(to_child, dev_uid, - [("servo0", 27), ("enable0", True), - ("servo1", 2), ("enable1", False), - ("servo2", 3), ("enable2", False), - ("servo3", 14), ("enable3", False)]), - make_send_write(to_child, dev_uid, - [("servo0", 20), ("enable0", True), - ("servo1", 12), ("enable1", False), - ("servo2", 20), ("enable2", False), - ("servo3", 29), ("enable3", True)]), - ], 1) - parameters = [] - for param in hm.DEVICES[hm.uid_to_device_id(dev_uid)]["params"]: - parameters.append(param["name"]) - to_child.send(["subscribe_device", [dev_uid, 10, parameters]]) - elif command == "device_values": - print("%10.2f, %s" % (time.time(), str(main_args))) diff --git a/hibike/hibike_tester.py b/hibike/hibike_tester.py index 70dbd6f3..15ac5a9e 100644 --- a/hibike/hibike_tester.py +++ b/hibike/hibike_tester.py @@ -3,6 +3,7 @@ """ import threading import time +import queue from multiprocessing import Process, Pipe, Queue import hibike_process @@ -23,8 +24,9 @@ def __init__(self): self.hibike_process.daemon = True self.hibike_process.start() self.uids = set() - out_thread = threading.Thread(target=self.process_output) - out_thread.start() + self.terminating = threading.Event() + self.out_thread = threading.Thread(target=self.process_output) + self.out_thread.start() self.device_values_cache = {} def process_output(self): @@ -39,8 +41,11 @@ def process_output(self): If it's a device value, cache it in the dictionary. """ - while True: - command, data = self.state_queue.get() + while not self.terminating.is_set(): + try: + command, data = self.state_queue.get(timeout=1) + except queue.Empty: + continue if command == "device_subscribed": uid = data[0] self.uids.add(uid) @@ -116,6 +121,14 @@ def disable(self): """ self.pipe_to_child.send(["disable_all", []]) + def terminate(self): + """ + Terminate the hibike process and clean up resources. + """ + self.hibike_process.terminate() + self.terminating.set() + self.out_thread.join() + # pylint: disable=too-many-branches, too-many-statements def run_test(): comms = Hibike() diff --git a/hibike/hibike_tests/hibike_process_tests.py b/hibike/hibike_tests/hibike_process_tests.py index dd766189..a1bb1d40 100644 --- a/hibike/hibike_tests/hibike_process_tests.py +++ b/hibike/hibike_tests/hibike_process_tests.py @@ -1,48 +1,113 @@ """ Unit tests for functions in hibike_process. """ -import unittest +import asyncio +import os +import random +import sys import time +import unittest + +import aioprocessing import serial + from spawn_virtual_devices import spawn_device, get_virtual_ports -from hibike_process import identify_smart_sensors +from hibike_process import hotplug_async +from hibike_tests.utils import AsyncTestCase import hibike_message as hm +from hibike_tester import Hibike + + +def add_runtime_to_path(): + """ + Enable import of runtime modules. + """ + path = os.path.dirname(os.path.abspath(__file__)) + parent_path = path.rstrip("hibike_tests").rstrip("hibike/") + runtime = os.path.join(parent_path, "runtime") + sys.path.insert(1, runtime) + + +add_runtime_to_path() +# We must import runtimeUtil to deserialize error messages +# pylint: disable=import-error, wrong-import-position, unused-import +import runtimeUtil + + + -class IdentifySmartSensorsTests(unittest.TestCase): +VIRTUAL_DEVICE_STARTUP_TIME = 2 +VIRTUAL_DEVICE_CONFIG_FILE = "virtual_devices.txt" + + +def spawn_virtual_devices(device_types): + """ + Spawn some virtual devices, wait for them to spin up, + then tell Hibike about them. + + :param: device_types the types of the devices to spawn + """ + device_ports = [] + for dev_type in device_types: + device_ports.append(spawn_device(dev_type)) + time.sleep(VIRTUAL_DEVICE_STARTUP_TIME) + write_out_virtual_devices(device_ports) + + +def write_out_virtual_devices(device_ports): """ - Tests for `identify_smart_sensors`. + Tell Hibike about virtual devices located on serial ports. + """ + with open(VIRTUAL_DEVICE_CONFIG_FILE, "w") as vdev_file: + vdev_file.write(" ".join(device_ports)) + vdev_file.flush() + + +class BasicHotplugTests(AsyncTestCase): + """ + Tests for `hotplug_async`. """ VIRTUAL_DEVICE_TYPES = ["LimitSwitch", "YogiBear", "YogiBear", - "RFID", "LineFollower", "BatteryBuzzer", - "Potentiometer"] - VIRTUAL_DEVICE_STARTUP_TIME = 2 + "RFID", "BatteryBuzzer"] + IDENTIFY_TIMEOUT = 5 + + def setUp(self): + super(BasicHotplugTests, self).setUp() + self.devices = {} + self.error_queue = asyncio.Queue(loop=self.loop) + self.state_queue = aioprocessing.AioQueue() + + def identify_devices(self): + """ + Try to identify virtual devices. + """ + hotplug = self.loop.create_task(hotplug_async(self.devices, {}, self.error_queue, + self.state_queue, self.loop)) + self.run_until_timeout(hotplug, self.IDENTIFY_TIMEOUT) + def assert_all_devices_identified(self, identified_devices, msg=None): """ Assert that IDENTIFIED_DEVICES contains all devices in VIRTUAL_DEVICE_TYPES. """ - device_ids = map(hm.uid_to_device_id, identified_devices.values()) + device_ids = map(hm.uid_to_device_id, identified_devices) found_types = map(lambda dev: hm.DEVICES[dev]["name"], device_ids) self.assertListEqual(sorted(self.VIRTUAL_DEVICE_TYPES), sorted(found_types), msg) def test_detect_devices(self): """ Test detection of valid devices. """ - virtual_devices = [] - for vdev_type in self.VIRTUAL_DEVICE_TYPES: - virtual_devices.append(serial.Serial(spawn_device(vdev_type))) - # Wait for virtual devices to spin up - time.sleep(self.VIRTUAL_DEVICE_STARTUP_TIME) - found = identify_smart_sensors(virtual_devices) - self.assert_all_devices_identified(found, "did not identify all sensors") + spawn_virtual_devices(self.VIRTUAL_DEVICE_TYPES) + self.identify_devices() + self.assert_all_devices_identified(self.devices, "did not identify all sensors") def test_detect_no_devices(self): """ Make sure that we don't detect empty serial ports as sensors. """ ports = [] for _ in range(5): ports.append(serial.Serial(get_virtual_ports()[1])) - - found = identify_smart_sensors(ports) - self.assertEqual(found, {}, "found smart sensor where there was none") + write_out_virtual_devices(list(map(lambda p: p.name, ports))) + self.identify_devices() + self.assertEqual(self.devices, {}, "found smart sensor where there was none") def test_detect_some_devices(self): """ @@ -55,7 +120,80 @@ def test_detect_some_devices(self): ports.append(serial.Serial(get_virtual_ports()[1])) for vdev_type in self.VIRTUAL_DEVICE_TYPES: devices.append(serial.Serial(spawn_device(vdev_type))) - time.sleep(self.VIRTUAL_DEVICE_STARTUP_TIME) - found = identify_smart_sensors(ports + devices) - self.assert_all_devices_identified(found, + write_out_virtual_devices(list(map(lambda d: d.name, devices))) + time.sleep(VIRTUAL_DEVICE_STARTUP_TIME) + self.identify_devices() + self.assert_all_devices_identified(self.devices, "identified devices differs from spawned devices") + + +class ReadWriteTests(unittest.TestCase): + """ + Test reading from and writing to devices. + """ + VIRTUAL_DEVICE_TYPES = ["RFID", "LimitSwitch", "BatteryBuzzer", + "YogiBear", "YogiBear", "YogiBear"] + HIBIKE_STARTUP_TIME = 2 + # Reads and writes take time to take effect + READ_WRITE_DELAY = 0.25 + # Reads and writes can ocasionally fail; for reliability, + # do them multiple times + READ_WRITE_ATTEMPTS = 5 + + def setUp(self): + spawn_virtual_devices(self.VIRTUAL_DEVICE_TYPES) + self.hibike = Hibike() + time.sleep(self.HIBIKE_STARTUP_TIME) + self.hibike.enumerate() + + def tearDown(self): + self.hibike.terminate() + + def write(self, uid: int, params_and_values): + """ + Send ``params_and_values`` to Hibike for writing. + """ + for _ in range(self.READ_WRITE_ATTEMPTS): + self.hibike.write(uid, params_and_values) + time.sleep(self.READ_WRITE_DELAY) + + def read(self, uid: int, param: str): + """ + Read ``param`` from Hibike and return the result. + """ + for _ in range(self.READ_WRITE_ATTEMPTS): + self.hibike.read(uid, [param]) + time.sleep(self.READ_WRITE_DELAY) + return self.hibike.get_last_cached(uid, param) + + def test_write_then_read(self): + """ + Writing a value to a device then reading from it should produce + the same value. + """ + for (uid, dev_type) in self.hibike.get_uids_and_types(): + if dev_type == "YogiBear": + duty_cycle_val = random.random() + self.write(uid, [("duty_cycle", duty_cycle_val)]) + hibike_value = self.read(uid, "duty_cycle") + self.assertAlmostEqual(hibike_value, duty_cycle_val) + + def test_subscribe_write(self): + """ + Test that subscribing sends up to date values. + """ + self.hibike.subscribe_all() + for (uid, dev_type) in self.hibike.get_uids_and_types(): + if dev_type == "YogiBear": + self.write(uid, [("duty_cycle", random.random())]) + new_val = random.random() + self.write(uid, [("duty_cycle", new_val)]) + new_hibike_val = self.hibike.get_last_cached(uid, "duty_cycle") + self.assertAlmostEqual(new_hibike_val, new_val) + + def test_nonexistent_device(self): + """ + Nonexistent device operations should error. + """ + self.read("0x123456789", "duty_cycle") + self.hibike.bad_things_queue.get(block=False) diff --git a/hibike/hibike_tests/utils.py b/hibike/hibike_tests/utils.py index d57d7670..3433f981 100644 --- a/hibike/hibike_tests/utils.py +++ b/hibike/hibike_tests/utils.py @@ -1,6 +1,8 @@ """ General utilities for unit tests. """ +import asyncio +import unittest def run_with_random_data(func, arg_func, kwarg_func=lambda: {}, times=5): """ @@ -11,3 +13,42 @@ def run_with_random_data(func, arg_func, kwarg_func=lambda: {}, times=5): """ for _ in range(times): func(*arg_func(), **kwarg_func()) + + +class AsyncTestCase(unittest.TestCase): + """ + A test case that creates an event loop before a test and + stops it afterwards. Useful for unit testing coroutines. + """ + def setUp(self): + self.loop = asyncio.new_event_loop() + + def tearDown(self): + try: + # Python will deprecate and remove ``Task.all_tasks`` in the future, + # but not all versions have ``asyncio.all_tasks``, so just do both. + tasks = asyncio.Task.all_tasks(self.loop) + except AttributeError: + # pylint: disable=no-member + tasks = asyncio.all_tasks(self.loop) + for t in tasks: + t.cancel() + try: + self.loop.run_until_complete(asyncio.gather(*tasks, loop=self.loop)) + except asyncio.CancelledError: + pass + self.loop.stop() + + def run_until_timeout(self, task, timeout): + """ + Run ``coro`` for up to ``timeout`` seconds. + """ + async def stop_event_loop(): + await asyncio.sleep(timeout, loop=self.loop) + + stop_task = self.loop.create_task(stop_event_loop()) + # Note: Yes, pending is not used, but if you remove it, + # async complains about cancelled coroutines + # pylint: disable=unused-variable + _, pending = self.loop.run_until_complete(asyncio.wait([task, stop_task], + return_when=asyncio.FIRST_COMPLETED)) diff --git a/hibike/lib/hibike/hibike_device.cpp b/hibike/lib/hibike/hibike_device.cpp index 905b5f42..c8d18b03 100644 --- a/hibike/lib/hibike/hibike_device.cpp +++ b/hibike/lib/hibike/hibike_device.cpp @@ -15,6 +15,23 @@ uint64_t prev_sub_time, curr_time, sent_heartbeat, resp_heartbeat, prevHeartTime led_state heartbeat_state; bool led_enabled; +// Set this when a heartbeat response is received. +static uint8_t queue_fullness; +// Set this to the maximum tolerable subscription delay. +const float MAX_SUB_DELAY_MS = 250.0f; +// Set this to the minimum tolerable subscription delay. +const float MIN_SUB_DELAY_MS = 40.0f; +// Tune this. +const float ALPHA = 0.25f; + +void update_sub_delay(void) { + // Clamp queue_fullness just in case it's greater than 100 + queue_fullness = min(queue_fullness, 100); + // Interpolate between the maximum and minimum delay + float new_delay = max(MAX_SUB_DELAY_MS * queue_fullness / 100, MIN_SUB_DELAY_MS); + sub_delay = (uint16_t)(ALPHA * sub_delay + (1 - ALPHA) * new_delay); +} + void hibike_setup(uint32_t _disable_latency, uint32_t _heartbeat_delay) { //heartbeat_delay to 0 to not send heartbeat Requests //disable_latency to 0 to not disable on lack of heartbeats. @@ -100,6 +117,9 @@ void hibike_loop() { case HEART_BEAT_RESPONSE: resp_heartbeat = curr_time; + // update sub delay with the queue fullness + queue_fullness = *((uint8_t*) &hibike_buff.payload[1]); + update_sub_delay(); break; case PING: diff --git a/hibike/lib/hibike/hibike_message.cpp b/hibike/lib/hibike/hibike_message.cpp index 78685ebf..76df3aec 100644 --- a/hibike/lib/hibike/hibike_message.cpp +++ b/hibike/lib/hibike/hibike_message.cpp @@ -215,21 +215,21 @@ uint16_t uint16_from_message(message_t* msg, uint8_t* offset) { } uint32_t uint32_from_message(message_t* msg, uint8_t* offset) { uint32_t res = (msg->payload[*offset + 0] & 0xFF) << 0; - res |= (msg->payload[*offset + 1] & 0xFF) << 8; - res |= (msg->payload[*offset + 2] & 0xFF) << 16; - res |= (msg->payload[*offset + 3] & 0xFF) << 24; + res |= (uint32_t)(msg->payload[*offset + 1] & 0xFF) << 8; + res |= (uint32_t)(msg->payload[*offset + 2] & 0xFF) << 16; + res |= (uint32_t)(msg->payload[*offset + 3] & 0xFF) << 24; *offset += sizeof(res); return res; } uint64_t uint64_from_message(message_t* msg, uint8_t* offset) { uint64_t res = (msg->payload[*offset + 0] & 0xFF) << 0; - res |= (msg->payload[*offset + 1] & 0xFF) << 8; - res |= (msg->payload[*offset + 2] & 0xFF) << 16; - res |= (msg->payload[*offset + 3] & 0xFF) << 24; - res |= (msg->payload[*offset + 4] & 0xFF) << 32; - res |= (msg->payload[*offset + 5] & 0xFF) << 40; - res |= (msg->payload[*offset + 6] & 0xFF) << 48; - res |= (msg->payload[*offset + 7] & 0xFF) << 56; + res |= (uint64_t)(msg->payload[*offset + 1] & 0xFF) << 8; + res |= (uint64_t)(msg->payload[*offset + 2] & 0xFF) << 16; + res |= (uint64_t)(msg->payload[*offset + 3] & 0xFF) << 24; + res |= (uint64_t)(msg->payload[*offset + 4] & 0xFF) << 32; + res |= (uint64_t)(msg->payload[*offset + 5] & 0xFF) << 40; + res |= (uint64_t)(msg->payload[*offset + 6] & 0xFF) << 48; + res |= (uint64_t)(msg->payload[*offset + 7] & 0xFF) << 56; *offset += sizeof(res); return res; } diff --git a/hibike/travis/Makefile b/hibike/travis/Makefile index d7a0ad32..3028c1cf 100644 --- a/hibike/travis/Makefile +++ b/hibike/travis/Makefile @@ -12,7 +12,7 @@ artifacts-install: $(nop) lint: - cd .. && find . -name "*.py" | xargs pylint + cd .. && find . -type d -name "hibike_packet_extension" -prune -o -type f -name "*.py" -print | xargs pylint unit_tests: cd .. && python3 -m unittest hibike_tests/*.py diff --git a/hibike/virtual_device.py b/hibike/virtual_device.py index da5ab433..f30829c4 100755 --- a/hibike/virtual_device.py +++ b/hibike/virtual_device.py @@ -8,17 +8,182 @@ 2016/09/20 21:29:03 socat[4165] N starting data transfer loop with FDs [3,3] and [5,5] $ python3.5 virtual_device.py -d LimitSwitch -p /dev/pts/26 """ -import random -import time import argparse +import asyncio +import json +import random import struct +import time # pylint: disable=import-error -import serial +import serial_asyncio import hibike_message as hm -# pylint: disable=too-many-statements, too-many-locals, too-many-branches -# pylint: disable=unused-variable + +# Format of default values storage: +# {"DeviceName": {"param1": value1, "param2": value2}} +DEFAULT_VALUES_FILE = "virtual_device_defaults.json" +with open(DEFAULT_VALUES_FILE) as f: + DEFAULT_VALUES = json.load(f) + + +class VirtualDevice(asyncio.Protocol): + """ + A fake Hibike smart sensor. + """ + HEARTBEAT_DELAY_MS = 100 + PACKET_BOUNDARY = bytes([0]) + def __init__(self, uid, event_loop, verbose=False): + self.uid = uid + self.event_loop = event_loop + self._ready = asyncio.Event(loop=event_loop) + self.serial_buf = bytearray() + self.read_queue = asyncio.Queue(loop=event_loop) + self.verbose = verbose + + self.update_time = 0 + self.delay = 0 + self.transport = None + + self.response_map = { + hm.MESSAGE_TYPES["Ping"]: self._process_ping, + hm.MESSAGE_TYPES["SubscriptionRequest"]: self._process_sub_request, + hm.MESSAGE_TYPES["DeviceRead"]: self._process_device_read, + hm.MESSAGE_TYPES["DeviceWrite"]: self._process_device_write, + hm.MESSAGE_TYPES["Disable"]: self._process_disable, + hm.MESSAGE_TYPES["HeartBeatResponse"]: self._process_heartbeat_response, + } + self.param_values = DEFAULT_VALUES[hm.uid_to_device_name(uid)] + self.subscribed_params = set() + + event_loop.create_task(self.process_messages()) + event_loop.create_task(self.send_subscribed_params()) + event_loop.create_task(self.request_heartbeats()) + + def data_received(self, data): + self.serial_buf.extend(data) + zero_loc = self.serial_buf.find(bytes([0])) + if zero_loc != -1: + self.serial_buf = self.serial_buf[zero_loc:] + packet = hm.parse_bytes(self.serial_buf) + if packet != None: + self.serial_buf = self.serial_buf[1:] + self.read_queue.put_nowait(packet) + elif self.serial_buf.count(self.PACKET_BOUNDARY) > 1: + new_packet = self.serial_buf[1:].find(self.PACKET_BOUNDARY) + 1 + self.serial_buf = self.serial_buf[new_packet:] + + + def verbose_log(self, fmt_string, *fmt_args): + """Log a message if verbosity is enabled.""" + if self.verbose: + print(fmt_string.format(*fmt_args)) + + async def process_messages(self): + """Process recieved messages.""" + await self._ready.wait() + while not self.transport.is_closing(): + msg = await self.read_queue.get() + msg_type = msg.get_message_id() + if msg_type not in self.response_map: + continue + self.response_map[msg_type](msg) + + async def send_subscribed_params(self): + """Send values of subscribed parameters at a regular interval.""" + await self._ready.wait() + device_id = hm.uid_to_device_id(self.uid) + while not self.transport.is_closing(): + await asyncio.sleep(0.005, loop=self.event_loop) + if self.update_time != 0 and self.delay != 0: + if time.time() - self.update_time >= self.delay * 0.001: + # If the time equal to the delay has elapsed since the previous device data, + # send a device data with the device id + # and the device's subscribed params and values + data = [] + for param in self.subscribed_params: + data.append((param, self.param_values[param])) + hm.send(self.transport, hm.make_device_data(device_id, data)) + self.update_time = time.time() + self.verbose_log("Regular data update sent from {}", + hm.uid_to_device_name(self.uid)) + + async def request_heartbeats(self): + """Request heartbeats on a regular basis.""" + await self._ready.wait() + while not self.transport.is_closing(): + await asyncio.sleep(self.HEARTBEAT_DELAY_MS/1000, loop=self.event_loop) + hm.send(self.transport, hm.make_heartbeat_request()) + + # pylint: disable=unused-argument + def _process_ping(self, msg): + """Respond to a ping packet.""" + self.verbose_log("Ping received") + dev_id = hm.uid_to_device_id(self.uid) + hm.send(self.transport, hm.make_subscription_response(dev_id, [], 0, self.uid)) + + def _process_sub_request(self, msg): + """Respond to a subscription request with an appropriate response.""" + self.update_time = time.time() + dev_id = hm.uid_to_device_id(self.uid) + self.verbose_log("Subscription request received") + params, delay = struct.unpack("= delay * 0.001: - # If the time equal to the delay has elapsed since the previous device data, - # send a device data with the device id - # and the device's subscribed params and values - data = [] - for data_tuple in params_and_values: - if data_tuple[0] in subscribed_params and hm.readable(device_id, data_tuple[0]): - data.append(data_tuple) - hm.send(conn, hm.make_device_data(device_id, data)) - update_time = time.time() - verbose_log("Regular data update sent from {}", device) - - msg = hm.read(conn) - if not msg: - time.sleep(.005) - continue - if msg.get_message_id() in [hm.MESSAGE_TYPES["SubscriptionRequest"]]: - # Update the delay, subscription time, - # and params, then send a subscription response - verbose_log("Subscription request received") - params, delay = struct.unpack(" # diff --git a/runtime/Pipfile b/runtime/Pipfile index ca271d4c..55a9974d 100644 --- a/runtime/Pipfile +++ b/runtime/Pipfile @@ -2,13 +2,23 @@ url = "https://pypi.python.org/simple" verify_ssl = true + [requires] python_version = "3.7" + [packages] click = "*" protobuf = "==3.2.0" pyserial = "==3.2.1" +flask = "*" +aioprocessing = "*" +pyserial-asyncio = "*" +aiofiles = "*" +uvloop = "*" +cobs = "*" + [dev-packages] pylint = "==1.8.1" +yappi = "*" diff --git a/runtime/Pipfile.lock b/runtime/Pipfile.lock index 1886daa4..113acb64 100644 --- a/runtime/Pipfile.lock +++ b/runtime/Pipfile.lock @@ -1,7 +1,20 @@ { "_meta": { "hash": { - "sha256": "bb858c5d59497be42ce93c55c72e7a51bdc8143217c76908fc9ce5b046312f6a" + "sha256": "f22def7bd4ef502618688e3e966ee763d3a413a4cbbf837f3cf18c360bf09436" + }, + "host-environment-markers": { + "implementation_name": "cpython", + "implementation_version": "3.6.6", + "os_name": "posix", + "platform_machine": "x86_64", + "platform_python_implementation": "CPython", + "platform_release": "4.15.0-36-generic", + "platform_system": "Linux", + "platform_version": "#39~16.04.1-Ubuntu SMP Tue Sep 25 08:59:23 UTC 2018", + "python_full_version": "3.6.6", + "python_version": "3.6", + "sys_platform": "linux" }, "pipfile-spec": 6, "requires": { @@ -15,12 +28,58 @@ ] }, "default": { + "aiofiles": { + "hashes": [ + "sha256:1e644c2573f953664368de28d2aa4c89dfd64550429d0c27c4680ccd3aa4985d", + "sha256:021ea0ba314a86027c166ecc4b4c07f2d40fc0f4b3a950d1868a0f2571c2bbee" + ], + "version": "==0.4.0" + }, + "aioprocessing": { + "hashes": [ + "sha256:9b88f4e51b7358e7a53ea2b6cfd1e000848270a1b09532a410b6d38e015b7de5", + "sha256:b6952b476586c2a2e0d2802f42d73e6898ab474213ffda788d720a3fb57b01fb" + ], + "version": "==1.0.1" + }, "click": { "hashes": [ - "sha256:29f99fc6125fbc931b758dc053b3114e55c77a6e4c6c3a2674a2dc986016381d", - "sha256:f15516df478d5a56180fbf80e68f206010e6d160fc39fa508b65e035fd75130b" + "sha256:2335065e6395b9e67ca716de5f7526736bfa6ceead690adf616d925bdc622b13", + "sha256:5b94b49521f6456670fdb30cd82a4eca9412788a93fa6dd6df72c94d5a8ff2d7" + ], + "version": "==7.0" + }, + "cobs": { + "hashes": [ + "sha256:ae3319b5704d9269d1ca60787d2e5b79322bb9f440215ea086b986ed8d2a8405" + ], + "version": "==1.1.3" + }, + "flask": { + "hashes": [ + "sha256:a080b744b7e345ccfcbc77954861cb05b3c63786e93f2b3875e0913d44b43f05", + "sha256:2271c0070dbcb5275fad4a82e29f23ab92682dc45f9dfbc22c02ba9b9322ce48" + ], + "version": "==1.0.2" + }, + "itsdangerous": { + "hashes": [ + "sha256:cbb3fcf8d3e33df861709ecaf89d9e6629cff0a217bc2848f1b41cd30d360519" + ], + "version": "==0.24" + }, + "jinja2": { + "hashes": [ + "sha256:74c935a1b8bb9a3947c50a54766a969d4846290e1e788ea44c1392163723c3bd", + "sha256:f84be1bb0040caca4cea721fcbbbbd61f9be9464ca236387158b0feea01914a4" + ], + "version": "==2.10" + }, + "markupsafe": { + "hashes": [ + "sha256:a6be69091dac236ea9c6bc7d012beab42010fa914c459791d627dad4910eb665" ], - "version": "==6.7" + "version": "==1.0" }, "protobuf": { "hashes": [ @@ -41,12 +100,41 @@ ], "version": "==3.2.1" }, + "pyserial-asyncio": { + "hashes": [ + "sha256:861d975029498e777a67c6d6d392e20b84a79c01608f3778c978d0106467c8f8", + "sha256:c40677a8874d8c24d4423a97498746de776f6dbcd0efbb8fa43dcf011a589aee" + ], + "version": "==0.4" + }, "six": { "hashes": [ "sha256:70e8a77beed4562e7f14fe23a786b54f6296e34344c23bc42f07b15018ff98e9", "sha256:832dc0e10feb1aa2c68dcc57dbb658f1c7e65b9b61af69048abc87a2db00a0eb" ], "version": "==1.11.0" + }, + "uvloop": { + "hashes": [ + "sha256:f2ffcaa13a5e279d0b3296cd6c691df39876cc818482168a80edd3b0e5deef57", + "sha256:089b3513db7f2122ac00a9ce18be879d626a566537c93bbcbb54053e3f24acf5", + "sha256:d3818242d174a326ea49e2e8f7c1e448432ce17ecb31aeb5084600950857b663", + "sha256:58d6978112ff292cedf2fd754c8085c9a8c6b98737b8ab3cda3d2a081977a91e", + "sha256:0657ebcccb261bdd0a360c83dbc6c1218f13cf5c1a3f381bca68ba5977bb6e5a", + "sha256:c2e04cab3e2c71d79002a814a243bc42f0253eb761b1f3af989d38ec8142532c", + "sha256:4076d40ae0b7557d5982ffe726b153c947a4d70725080ead0121006bdb9f7431", + "sha256:cbab9f6de63b10fc4991cbf9a720a8ceecfba501f5571f35fc3a74c76223ea66", + "sha256:251744b1bb162577db2e48cccb28ec9bad4126591471e6ba63dcbe71abc1c741", + "sha256:a97bd62ebbdf7e6e84bf44afe439d9b24ce4d8661a29a639626a8c03748f6f98" + ], + "version": "==0.11.2" + }, + "werkzeug": { + "hashes": [ + "sha256:d5da73735293558eb1651ee2fddc4d0dedcfa06538b8813a2e20011583c9e49b", + "sha256:c3fd7a7d41976d9f44db327260e263132466836cef6f91512889ed60ad26557c" + ], + "version": "==0.14.1" } }, "develop": { @@ -59,11 +147,10 @@ }, "isort": { "hashes": [ + "sha256:ec9ef8f4a9bc6f71eec99e1806bfa2de401650d996c59330782b89a5555c1497", "sha256:1153601da39a25b14ddc54955dbbacbb6b2d19135386699e2ad58517953b34af", - "sha256:b9c40e9750f3d77e6e4d441d8b0266cf555e7cdabdcff33c4fd06366ca761ef8", - "sha256:ec9ef8f4a9bc6f71eec99e1806bfa2de401650d996c59330782b89a5555c1497" + "sha256:b9c40e9750f3d77e6e4d441d8b0266cf555e7cdabdcff33c4fd06366ca761ef8" ], - "markers": "python_version != '3.3.*' and python_version != '3.2.*' and python_version != '3.0.*' and python_version >= '2.7' and python_version != '3.1.*'", "version": "==4.3.4" }, "lazy-object-proxy": { @@ -126,6 +213,12 @@ "sha256:d4d560d479f2c21e1b5443bbd15fe7ec4b37fe7e53d335d3b9b0a7b1226fe3c6" ], "version": "==1.10.11" + }, + "yappi": { + "hashes": [ + "sha256:5f657129e1b9b952379ffbc009357d0dcdb58c50f3bfe88ffbb992e4b27b263c" + ], + "version": "==0.98" } } } diff --git a/runtime/runtime.py b/runtime/runtime.py index 08d8e163..8486292e 100644 --- a/runtime/runtime.py +++ b/runtime/runtime.py @@ -375,7 +375,7 @@ def terminate_process(process_name): return process = ALL_PROCESSES.pop(process_name) process.terminate() - for _ in range(100): # Gives 1 sec for process to terminate + for _ in range(300): # Gives 3 seconds for process to terminate time.sleep(.01) # Give the OS a chance to terminate the other process if not process.is_alive(): break diff --git a/runtime/runtimeUtil.py b/runtime/runtimeUtil.py index 6df3bc4f..783b3c32 100644 --- a/runtime/runtimeUtil.py +++ b/runtime/runtimeUtil.py @@ -7,7 +7,7 @@ import os import json -__version__ = (1, 2, 0) +__version__ = (1, 3, 0) class AutoIntEnum(IntEnum): diff --git a/runtime/studentapi.py b/runtime/studentapi.py index 7d8f75fa..9075332f 100644 --- a/runtime/studentapi.py +++ b/runtime/studentapi.py @@ -142,6 +142,19 @@ def set_value(self, device_name, param, value): self._check_value(param, value) self.to_manager.put([HIBIKE_COMMANDS.WRITE, [uid, [(param, value)]]]) + def set_motor(self, device_name, value): + uid = self._hibike_get_uid(device_name) + self._check_write_params(uid, "duty_cycle") + self._check_value("duty_cycle", value) + self.to_manager.put([HIBIKE_COMMANDS.WRITE, [uid, [("duty_cycle", value)]]]) + + def stop_motor(self, device_name): + uid = self._hibike_get_uid(device_name) + self._check_write_params(uid, "duty_cycle") + self._check_value("duty_cycle", 0) + self.to_manager.put([HIBIKE_COMMANDS.WRITE, [uid, [("duty_cycle", 0)]]]) + + def run(self, func, *args, **kwargs): """ Starts a "coroutine", i.e. a series of actions that proceed diff --git a/runtime/test_async.py b/runtime/test_async.py new file mode 100644 index 00000000..31072f3e --- /dev/null +++ b/runtime/test_async.py @@ -0,0 +1,106 @@ +import datetime +import heapq +import types +import time + + +class Task: + """Represent how long a coroutine should wait before starting again. + + Comparison operators are implemented for use by heapq. Two-item + tuples unfortunately don't work because when the datetime.datetime + instances are equal, comparison falls to the coroutine and they don't + implement comparison methods, triggering an exception. + + Think of this as being like asyncio.Task/curio.Task. + """ + + def __init__(self, wait_until, coro): + self.coro = coro + self.waiting_until = wait_until + + def __eq__(self, other): + return self.waiting_until == other.waiting_until + + def __lt__(self, other): + return self.waiting_until < other.waiting_until + + +class SleepingLoop: + """An event loop focused on delaying execution of coroutines. + + Think of this as being like asyncio.BaseEventLoop/curio.Kernel. + """ + + def __init__(self, *coros): + self._new = coros + self._waiting = [] + + def run_until_complete(self): + # Start all the coroutines. + for coro in self._new: + wait_for = coro.send(None) + heapq.heappush(self._waiting, Task(wait_for, coro)) + # Keep running until there is no more work to do. + while self._waiting: + now = datetime.datetime.now() + # Get the coroutine with the soonest resumption time. + task = heapq.heappop(self._waiting) + if now < task.waiting_until: + # We're ahead of schedule; wait until it's time to resume. + delta = task.waiting_until - now + time.sleep(delta.total_seconds()) + now = datetime.datetime.now() + try: + # It's time to resume the coroutine. + wait_until = task.coro.send(now) + heapq.heappush(self._waiting, Task(wait_until, task.coro)) + except StopIteration: + # The coroutine is done. + pass + + +@types.coroutine +def sleep(seconds): + """Pause a coroutine for the specified number of seconds. + + Think of this as being like asyncio.sleep()/curio.sleep(). + """ + now = datetime.datetime.now() + wait_until = now + datetime.timedelta(seconds=seconds) + # Make all coroutines on the call stack pause; the need to use `yield` + # necessitates this be generator-based and not an async-based coroutine. + actual = yield wait_until + # Resume the execution stack, sending back how long we actually waited. + return actual - now + + +async def countdown(label, length, *args, delay=0): + """Countdown a launch for `length` seconds, waiting `delay` seconds. + + This is what a user would typically write. + """ + print(label, 'waiting', delay, 'seconds before starting countdown') + delta = await sleep(delay) + print(label, 'starting after waiting', delta) + while length: + print(label, 'T-minus', length) + waited = await sleep(1) + length -= 1 + print(label, 'lift-off!') + + +def main(): + """Start the event loop, counting down 3 separate launches. + + This is what a user would typically write. + """ + loop = SleepingLoop(countdown('A', 5), countdown('B', 3, delay=2), + countdown('C', 4, delay=1)) + start = datetime.datetime.now() + loop.run_until_complete() + print('Total elapsed time is', datetime.datetime.now() - start) + + +if __name__ == '__main__': + main()