Skip to content

Python library for communicating with RuuviTag BLE Sensor Beacon and for decoding sensor data from broadcasted eddystone-url.

License

Notifications You must be signed in to change notification settings

kymwatts/ruuvitag-sensor

 
 

Repository files navigation

RuuviTag Sensor Python Package

Codeship Status for ttu/ruuvitag-sensor PyPI

RuuviTag Sensor is a Python library for communicating with RuuviTag BLE Sensor Beacon and for decoding sensord data from broadcasted eddystone-url.

Requirements

  • RuuviTag with Weather Station firmware
  • Python 2.7 and 3
    • psutil
      • Package uses psutil to start and stop processes. Psutil requires sudo apt-get install python-dev or sudo apt-get install python3-dev
  • Linux
    • Package's Windows and OSX supports are only for testing and url decoding
  • Bluez
    • sudo apt-get install bluez bluez-hcidump
  • Superuser rights
    • Package uses internally hciconf, hcitool and hcidump, which require superuser rights

Installation

Install latest released version

$ pip install ruuvitag_sensor

Install latest developement version

$ pip install git+https://github.com/ttu/ruuvitag-sensor
# Or clone this repository and install locally
$ pip install -e .

Full installation guide for Raspberry PI & Raspbian

Usage

RuuviTag sensors can be identified using MAC addresses.

Get data from sensor
from ruuvitag_sensor.ruuvi import RuuviTagSensor

sensor = RuuviTagSensor('AA:2C:6A:1E:59:3D')

# update state from the device
state = sensor.update()

# get latest state (does not get it from the device)
state = sensor.state

print(state)
Get sensor datas with callback

get_datas calls the callback every time when a RuuviTag sensor broadcasts data

from ruuvitag_sensor.ruuvi import RuuviTagSensor

def handle_data(found_data):
    print('MAC ' + found_data[0])
    print(found_data[1])

RuuviTagSensor.get_datas(handle_data)

Optional list of macs and run flag can be passed to the get_datas function. Callback is called only for macs in the list and setting run flag to false will stop execution. If run flag is not passed, function will execute forever.

from ruuvitag_sensor.ruuvi import RuuviTagSensor, RunFlag

counter = 10
# RunFlag for stopping execution at desired time
run_flag = RunFlag()

def handle_data(found_data):
    print('MAC ' + found_data[0])
    print(found_data[1])
    global counter
    counter = counter - 1
    if counter < 0:
        run_flag.running = False

# List of macs of sensors which will execute callback function
macs = ['AA:2C:6A:1E:59:3D', 'CC:2C:6A:1E:59:3D']

RuuviTagSensor.get_datas(handle_data, macs, run_flag)
Get data for specified sensors

get_data_for_sensors will collect latest data from sensors for specified duration.

from ruuvitag_sensor.ruuvi import RuuviTagSensor

# List of macs of sensors which data will be collected
# If list is empty, data will be collected for all found sensors
macs = ['AA:2C:6A:1E:59:3D', 'CC:2C:6A:1E:59:3D']
# get_data_for_sensors will look data for the duration of timeout_in_sec
timeout_in_sec = 4

datas = RuuviTagSensor.get_data_for_sensors(macs, timeout_in_sec)

# Dictionary will have lates data for each sensor
print(datas['AA:2C:6A:1E:59:3D'])
print(datas['CC:2C:6A:1E:59:3D'])
RuuviTagReactive

Reactive wrapper and background process for RuuviTagSensor get_datas. Optional MAC address list can be passed on initializer and execution can be stopped with stop function.

from ruuvitag_sensor.ruuvi_rx import RuuviTagReactive

ruuvi_rx = RuuviTagReactive()

# Print all notifications
ruuvi_rx.get_subject().\
    subscribe(print)

# Print only last data every 10 seconds for F4:A5:74:89:16:57
ruuvi_rx.get_subject().\
    filter(lambda x: x[0] == 'F4:A5:74:89:16:57').\
    buffer_with_time(10000).\
    subscribe(lambda datas: print(datas[len(datas) - 1]))

# Execute only every time when temperature changes for F4:A5:74:89:16:57
ruuvi_rx.get_subject().\
    filter(lambda x: x[0] == 'F4:A5:74:89:16:57').\
    map(lambda x: x[1]['temperature']).\
    distinct_until_changed().\
    subscribe(lambda x: print('Temperature changed: {}'.format(x)))

More samples and simple HTTP server under examples directory.

Check official documentation from RxPy GitHub and RxPY Public API

Find sensors

find_ruuvitags function will exeute forever and when new RuuviTag sensor is found it will print it's MAC address and state at that moment. This function can be used with a command line applications.

from ruuvitag_sensor.ruuvi import RuuviTagSensor

RuuviTagSensor.find_ruuvitags()
Parse data
from ruuvitag_sensor.ruuvi import RuuviTagSensor
from ruuvitag_sensor.url_decoder import UrlDecoder

full_data = '043E2A0201030157168974A5F41E0201060303AAFE1616AAFE10EE037275752E76692341412C3E672B49246AB9'
data = full_data[26:]

encoded = RuuviTagSensor.convert_data(data)

sensor_data = UrlDecoder().decode_data(encoded)

print(sensor_data)
Command line
$ python ruuvitag_sensor -h

usage: ruuvitag_sensor [-h] [-g MAC_ADDRESS] [-f] [-l] [-s] [--version]

optional arguments:
  -h, --help            show this help message and exit
  -g MAC_ADDRESS, --get MAC_ADDRESS
                        Get data
  -f, --find            Find broadcasting RuuviTags
  -l, --latest          Get latest data for found RuuviTags
  -s, --stream          Stream broadcasts from all RuuviTags
  --version             show program's version number and exit

Tests

Tests use unittest.mock library, so Python 3.3. or newer is required.

Run with nosetests

$ pip install nose
$ nosetests

Run with setup

$ python setup.py test

Examples

Examples are in examples directory.

Change log

Change Log

License

Licensed under the MIT License.

About

Python library for communicating with RuuviTag BLE Sensor Beacon and for decoding sensor data from broadcasted eddystone-url.

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Languages

  • Python 100.0%