Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature/franka_shutdown #4

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
__pycache__
78 changes: 9 additions & 69 deletions __init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,27 +10,18 @@
# https://github.com/frankaemika/libfranka/issues/63
# https://github.com/ib101/DVK/blob/master/Code/DVK.py

import requests
from urllib.parse import urljoin
import hashlib
import base64
import argparse
from time import sleep
from itertools import count
import atexit
from time import sleep
from threading import Event
import atexit
import argparse
from urllib.parse import urljoin
from franka_client import FrankaClient


class FrankaLockUnlock:
class FrankaLockUnlock(FrankaClient):
def __init__(self, hostname: str, username: str, password: str, protocol: str = 'https', relock: bool = False):
requests.packages.urllib3.disable_warnings()
self._session = requests.Session()
self._session.verify = False
self._hostname = f'{protocol}://{hostname}'
self._username = username
self._password = password
self._logged_in = False
self._token = None
self._token_id = None
super().__init__(hostname, username, password, protocol=protocol)
self._relock = relock
atexit.register(self._cleanup)

Expand All @@ -44,57 +35,6 @@ def _cleanup(self):
self._logout()
print("Successfully cleaned up.")

@staticmethod
def _encode_password(username, password):
bs = ','.join([str(b) for b in hashlib.sha256((f'{password}#{username}@franka').encode('utf-8')).digest()])
return base64.encodebytes(bs.encode('utf-8')).decode('utf-8')

def _login(self):
print("Logging in...")
if self._logged_in:
print("Already logged in.")
return
login = self._session.post(urljoin(self._hostname, '/admin/api/login'), \
json={'login': self._username, \
'password': self._encode_password(self._username, self._password)})
assert login.status_code == 200, "Error logging in."
self._session.cookies.set('authorization', login.text)
self._logged_in = True
print("Successfully logged in.")

def _logout(self):
print("Logging out...")
assert self._logged_in
logout = self._session.post(urljoin(self._hostname, '/admin/api/logout'))
assert logout.status_code == 200, "Error logging out"
self._session.cookies.clear()
self._logged_in = False
print("Successfully logged out.")

def _get_active_token_id(self):
token_query = self._session.get(urljoin(self._hostname, '/admin/api/control-token'))
assert token_query.status_code == 200, "Error getting control token status."
json = token_query.json()
return None if json['activeToken'] is None else json['activeToken']['id']

def _is_active_token(self):
active_token_id = self._get_active_token_id()
return active_token_id is None or active_token_id == self._token_id

def _request_token(self, physically=False):
print("Requesting a control token...")
if self._token is not None:
assert self._token_id is not None
print("Already having a control token.")
return
token_request = self._session.post(urljoin(self._hostname, f'/admin/api/control-token/request{"?force" if physically else ""}'), \
json={'requestedBy': self._username})
assert token_request.status_code == 200, "Error requesting control token."
json = token_request.json()
self._token = json['token']
self._token_id = json['id']
print(f'Received control token is {self._token} with id {self._token_id}.')

def _release_token(self):
print("Releasing control token...")
token_delete = self._session.delete(urljoin(self._hostname, '/admin/api/control-token'), \
Expand Down Expand Up @@ -137,7 +77,7 @@ def run(self, unlock: bool = False, force: bool = False, wait: bool = False, req
while True:
self._request_token(physically=request)
try:
# Consider the timeout of 30 s for requesting physical access to the robot
# Consider the timeout of 20 s for requesting physical access to the robot
for _ in range(20) if request else count():
if (not wait and not request) or self._is_active_token():
print('Successfully acquired control over the robot.')
Expand Down
85 changes: 85 additions & 0 deletions franka_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
# Copyright jk-ethz
# Released under GNU AGPL-3.0
# Contact us for other licensing options.

# Developed and tested on system version
# 4.2.1

# Inspired by
# https://github.com/frankaemika/libfranka/issues/63
# https://github.com/ib101/DVK/blob/master/Code/DVK.py

from abc import ABC, abstractmethod
import hashlib
import base64
import requests
from urllib.parse import urljoin
from http import HTTPStatus


class FrankaClient(ABC):
def __init__(self, hostname: str, username: str, password: str, protocol: str = 'https'):
requests.packages.urllib3.disable_warnings()
self._session = requests.Session()
self._session.verify = False
self._hostname = f'{protocol}://{hostname}'
self._username = username
self._password = password
self._logged_in = False
self._token = None
self._token_id = None

@staticmethod
def _encode_password(username, password):
bs = ','.join([str(b) for b in hashlib.sha256((f'{password}#{username}@franka').encode('utf-8')).digest()])
return base64.encodebytes(bs.encode('utf-8')).decode('utf-8')

def _login(self):
print("Logging in...")
if self._logged_in:
print("Already logged in.")
return
login = self._session.post(urljoin(self._hostname, '/admin/api/login'), \
json={'login': self._username, \
'password': self._encode_password(self._username, self._password)})
assert login.status_code == HTTPStatus.OK, "Error logging in."
self._session.cookies.set('authorization', login.text)
self._logged_in = True
print("Successfully logged in.")

def _logout(self):
print("Logging out...")
assert self._logged_in
logout = self._session.post(urljoin(self._hostname, '/admin/api/logout'))
assert logout.status_code == HTTPStatus.OK, "Error logging out"
self._session.cookies.clear()
self._logged_in = False
print("Successfully logged out.")

def _get_active_token_id(self):
token_query = self._session.get(urljoin(self._hostname, '/admin/api/control-token'))
assert token_query.status_code == HTTPStatus.OK, "Error getting control token status."
json = token_query.json()
return None if json['activeToken'] is None else json['activeToken']['id']

def _is_active_token(self):
active_token_id = self._get_active_token_id()
return active_token_id is None or active_token_id == self._token_id

def _request_token(self, physically=False):
print("Requesting a control token...")
if self._token is not None:
assert self._token_id is not None
print("Already having a control token.")
return
token_request = self._session.post(urljoin(self._hostname, f'/admin/api/control-token/request{"?force" if physically else ""}'), \
json={'requestedBy': self._username})
assert token_request.status_code == HTTPStatus.OK, "Error requesting control token."
json = token_request.json()
self._token = json['token']
self._token_id = json['id']
print(f'Received control token is {self._token} with id {self._token_id}.')

@abstractmethod
def run(self) -> None:
pass
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
packages=[],
scripts=[
'__init__.py',
'shutdown.py',
],
install_requires=[]
)
Expand Down
78 changes: 78 additions & 0 deletions shutdown.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
#!/usr/bin/env python3
# Copyright jk-ethz
# Released under GNU AGPL-3.0
# Contact us for other licensing options.

# Developed and tested on system version
# 4.2.1

# Inspired by
# https://github.com/frankaemika/libfranka/issues/63
# https://github.com/ib101/DVK/blob/master/Code/DVK.py

from itertools import count
from time import sleep
import argparse
from requests.exceptions import ConnectionError
from urllib.parse import urljoin
from franka_client import FrankaClient


class FrankaShutdown(FrankaClient):
def __init__(self, hostname: str, username: str, password: str, protocol: str = 'https'):
super().__init__(hostname, username, password, protocol=protocol)

def _shutdown(self):
print("Shutting down...")
assert self._is_active_token(), "Cannot shutdown without an active control token."
try:
self._session.post(urljoin(self._hostname, '/admin/api/shutdown'), json={'token': self._token})
except ConnectionError as _:
# Sometimes, the server can shut down before sending a response, possibly raising an exception.
# Anyways, the server has still received the request, thus the robot shutdown procedure will start.
# So, we can ignore the cases when these exceptions are raised.
pass
finally:
print("The robot is shutting down. Please wait for the yellow lights to turn off, then switch the control box off.")

def run(self, wait: bool = False, request: bool = False) -> None:
assert not request or wait, "Requesting control without waiting for obtaining control is not supported."
self._login()
try:
assert self._token is not None or self._get_active_token_id() is None or wait, "Error requesting control, the robot is currently in use."
while True:
self._request_token(physically=False)
try:
# Consider the timeout of 20 s for requesting physical access to the robot
for _ in range(20) if request else count():
if (not wait and not request) or self._is_active_token():
print('Successfully acquired control over the robot.')
return
if request:
print('Please press the button with the (blue) circle on the robot to confirm physical access.')
elif wait:
print('Please confirm the request message in the web interface on the logged in user.')
sleep(1)
# In case physical access was not confirmed, try again
self._release_token()
finally:
self._shutdown()
finally:
pass


if __name__ == '__main__':
parser = argparse.ArgumentParser(
prog = 'FrankaShutdown',
description = 'Shutdown the Franka Emika Panda programmatically.',
epilog = '(c) jk-ethz, https://github.com/jk-ethz'
)
parser.add_argument('hostname', help='The Franka Desk IP address or hostname, for example "1.2.3.4".')
parser.add_argument('username', help='The Franka Desk username, usually "admin".')
parser.add_argument('password', help='The Franka Desk password.')
parser.add_argument('-w', '--wait', action='store_true', help='Wait in case the robot web UI is currently in use.')
parser.add_argument('-r', '--request', action='store_true', help='Request control by confirming physical access to the robot in case the robot web UI is currently in use.')
args, _ = parser.parse_known_args()

franka_lock_unlock = FrankaShutdown(hostname=args.hostname, username=args.username, password=args.password)
franka_lock_unlock.run(wait=args.wait, request=args.request)