Skip to content

Commit

Permalink
Add 2FA support
Browse files Browse the repository at this point in the history
  • Loading branch information
schmittx committed Mar 4, 2024
1 parent e7bdb0d commit f8d3872
Show file tree
Hide file tree
Showing 6 changed files with 153 additions and 39 deletions.
74 changes: 62 additions & 12 deletions custom_components/leviton_decora_smart_wifi/api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,30 @@
from typing import Any

import json
import logging
import os
import requests

from .residence import Residence
from .const import (
API_ENDPOINT,
LOGIN_CODE_INVALID,
LOGIN_CODE_REQUIRED,
LOGIN_FAILED,
LOGIN_SUCCESS,
LOGIN_TOO_MANY_ATTEMPTS,
)

_LOGGER = logging.getLogger(__name__)


class LevitonException(Exception):
def __init__(self, status_code: int, name: str, message: str) -> None:
super(LevitonException, self).__init__()
self.status_code = status_code
self.name = name
self.message = message
_LOGGER.debug(f"\n- LevitionException\n- Status: {self.status_code}\n- Name: {self.name}\n- Message: {self.message}")


class LevitonAPI(object):
Expand All @@ -33,9 +42,10 @@ def __init__(
self.save_location = save_location
self.user_id = user_id

self.data = []
self.credentials: dict = {}
self.data: list = []
self.session = requests.Session()
self.user_name = None
self.user_name: str = None

def call(
self,
Expand Down Expand Up @@ -64,27 +74,59 @@ def call(
self.save_response(response=response, name=url)
return response

def login(self, email: str, password: str) -> bool:
def login(self, email: str, password: str, code: str | None = None) -> str:
try:
data = {"email": email, "password": password}
if code:
data["code"] = code
response = self.call(
method="post",
url="person/login",
params={"include": "user"},
data={"email": email, "password": password},
data=data,
)
self.authorization = response["id"]
self.user_id = response["user"]["id"]
self.user_name = "{} {}".format(
response["user"]["firstName"],
response["user"]["lastName"],
)
except LevitonException:
return False
return True
except LevitonException as exception:
if all(
[
exception.status_code == 401,
exception.message == "Login Failed",
]
):
return LOGIN_FAILED
elif all(
[
exception.status_code == 403,
exception.message == "Too many failed attempts",
]
):
return LOGIN_TOO_MANY_ATTEMPTS
elif all(
[
exception.status_code == 406,
exception.message == "Insufficient Data: Person uses two factor authentication. Requires code.",
]
):
return LOGIN_CODE_REQUIRED
elif all(
[
exception.status_code == 408,
exception.message == "Error: Invalid code",
]
):
return LOGIN_CODE_INVALID
return LOGIN_FAILED
self.credentials = data
return LOGIN_SUCCESS

def parse_response(self, response: requests.Response) -> dict[str, Any] | None:
data = json.loads(response.text)
if data is dict and data.get("error"):
if response.status_code not in [200]:
error = data["error"]
raise LevitonException(
status_code=error.get("statusCode"),
Expand All @@ -97,11 +139,19 @@ def refresh(self, function):
try:
return function()
except LevitonException as exception:
if (exception.status_code == 401 and exception.message == "Invalid Access Token"):
self.login()
if all(
[
exception.status_code == 401,
exception.message == "Invalid Access Token",
]
):
self.login(
email=self.credentials["email"],
password=self.credentials["password"],
code=self.credentials.get("code"),
)
return function()
else:
raise exception
raise exception

def save_response(self, response: dict[str, Any], name: str = "response") -> None:
if self.save_location and response:
Expand Down
6 changes: 6 additions & 0 deletions custom_components/leviton_decora_smart_wifi/api/const.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
"""Leviton API"""
API_ENDPOINT = "https://my.leviton.com/api"

LOGIN_CODE_INVALID = "login_code_invalid"
LOGIN_CODE_REQUIRED = "login_code_required"
LOGIN_FAILED = "login_failed"
LOGIN_SUCCESS = "login_success"
LOGIN_TOO_MANY_ATTEMPTS = "login_too_many_attempts"

BULB_THRESHOLD_NORMAL = "normal"
BULB_THRESHOLD_MEDIUM = "medium"
BULB_THRESHOLD_EXTENDED = "extended"
Expand Down
84 changes: 60 additions & 24 deletions custom_components/leviton_decora_smart_wifi/config_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from homeassistant import config_entries
from homeassistant.const import (
CONF_CODE,
CONF_EMAIL,
CONF_ID,
CONF_NAME,
Expand All @@ -14,7 +15,7 @@
from homeassistant.core import callback
import homeassistant.helpers.config_validation as cv

from .api import LevitonAPI, LevitonException
from .api import LOGIN_CODE_REQUIRED, LOGIN_SUCCESS, LevitonAPI, LevitonException
from .const import (
CONF_DEVICES,
CONF_RESIDENCES,
Expand Down Expand Up @@ -45,6 +46,21 @@ def __init__(self):
self.response = None
self.user_input = {}

async def async_finish_login(self, errors):
await self.async_set_unique_id(self.api.user_id)
self._abort_if_unique_id_configured()

try:
self.response = await self.hass.async_add_executor_job(self.api.update)
except LevitonException as exception:
errors["base"] = "update_failed"

self.user_input[CONF_ID] = self.api.user_id
self.user_input[CONF_NAME] = self.api.user_name
self.user_input[CONF_TOKEN] = self.api.authorization

return await self.async_step_residences()

async def async_step_user(self, user_input=None):
errors = {}

Expand All @@ -54,29 +70,19 @@ async def async_step_user(self, user_input=None):
self.user_input[CONF_PASSWORD] = user_input[CONF_PASSWORD]
self.api = LevitonAPI()

try:
await self.hass.async_add_executor_job(
self.api.login, user_input[CONF_EMAIL], user_input[CONF_PASSWORD],
)
except LevitonException as exception:
_LOGGER.error(f"Status: {exception.status}, Error Message: {exception.error_message}")
errors["base"] = "invalid_login"

await self.async_set_unique_id(self.api.user_id)
self._abort_if_unique_id_configured()
result = await self.hass.async_add_executor_job(
self.api.login,
self.user_input[CONF_EMAIL],
self.user_input[CONF_PASSWORD],
)

try:
self.response = await self.hass.async_add_executor_job(self.api.update)
except LevitonException as exception:
_LOGGER.error(f"Status: {exception.status}, Error Message: {exception.error_message}")
errors["base"] = "update_failed"

self.user_input[CONF_ID] = self.api.user_id
self.user_input[CONF_EMAIL] = user_input[CONF_EMAIL]
self.user_input[CONF_NAME] = self.api.user_name
self.user_input[CONF_TOKEN] = self.api.authorization

return await self.async_step_residences()
if result == LOGIN_CODE_REQUIRED:
_LOGGER.debug(f"Two factor authentication is required for the account")
return await self.async_step_authenticate()
elif result == LOGIN_SUCCESS:
_LOGGER.debug(f"Login successful")
return await self.async_finish_login(errors)
errors["base"] = result

return self.async_show_form(
step_id="user",
Expand All @@ -89,6 +95,37 @@ async def async_step_user(self, user_input=None):
errors=errors,
)

async def async_step_authenticate(self, user_input=None):
errors = {}

if user_input is not None:

self.user_input[CONF_CODE] = user_input[CONF_CODE]
self.api = LevitonAPI()

result = await self.hass.async_add_executor_job(
self.api.login,
self.user_input[CONF_EMAIL],
self.user_input[CONF_PASSWORD],
self.user_input[CONF_CODE],
)

if result == LOGIN_SUCCESS:
_LOGGER.debug(f"Login successful")
return await self.async_finish_login(errors)
errors["base"] = result

return self.async_show_form(
step_id="authenticate",
data_schema=vol.Schema(
{
vol.Required(CONF_CODE): cv.string,
}
),
description_placeholders={"email": self.user_input[CONF_EMAIL]},
errors=errors,
)

async def async_step_residences(self, user_input=None):
errors = {}

Expand All @@ -114,7 +151,6 @@ async def async_step_residences(self, user_input=None):
vol.Required(CONF_RESIDENCES, default=residence_names): cv.multi_select(residence_names),
}
),
description_placeholders={"user_name": self.user_input[CONF_NAME]},
errors=errors,
)

Expand Down
2 changes: 1 addition & 1 deletion custom_components/leviton_decora_smart_wifi/manifest.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@
"loggers": [],
"quality_scale": "gold",
"requirements": ["pypng==0.20220715.0", "PyQRCode==1.2.1"],
"version": "1.3.2"
"version": "1.4.0"
}
13 changes: 12 additions & 1 deletion custom_components/leviton_decora_smart_wifi/strings.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@
"already_configured": "The desired account is already configured"
},
"error": {
"invalid_login": "Invalid login",
"login_code_invalid": "Invalid code",
"login_code_required": "Code required",
"login_failed": "Failed",
"login_success": "Success",
"login_too_many_attempts": "Too many attempts",
"update_failed": "Update failed"
},
"step": {
Expand All @@ -16,6 +20,13 @@
"description": "Enter the email address and password associated with your account.",
"title": "Login to My Leviton account"
},
"authenticate": {
"data": {
"code": "Code"
},
"description": "Enter the code that was sent to {email}.",
"title": "Two factor authentication required"
},
"residences": {
"data": {
"residences": "Residences"
Expand Down
13 changes: 12 additions & 1 deletion custom_components/leviton_decora_smart_wifi/translations/en.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@
"already_configured": "The desired account is already configured"
},
"error": {
"invalid_login": "Invalid login",
"login_code_invalid": "Invalid code",
"login_code_required": "Code required",
"login_failed": "Failed",
"login_success": "Success",
"login_too_many_attempts": "Too many attempts",
"update_failed": "Update failed"
},
"step": {
Expand All @@ -16,6 +20,13 @@
"description": "Enter the email address and password associated with your account.",
"title": "Login to My Leviton account"
},
"authenticate": {
"data": {
"code": "Code"
},
"description": "Enter the code that was sent to {email}.",
"title": "Two factor authentication required"
},
"residences": {
"data": {
"residences": "Residences"
Expand Down

0 comments on commit f8d3872

Please sign in to comment.