Skip to content

Commit

Permalink
add: computer title generation for zero-touch deployment
Browse files Browse the repository at this point in the history
  • Loading branch information
st3v3nmw committed Feb 1, 2024
1 parent 340e8fc commit cd05b56
Show file tree
Hide file tree
Showing 11 changed files with 499 additions and 9 deletions.
55 changes: 55 additions & 0 deletions landscape/client/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,15 @@
"""
import getpass
import io
import json
import os
import pwd
import shlex
import subprocess
import sys
import textwrap
from datetime import datetime
from datetime import timezone
from functools import partial
from urllib.parse import urlparse

Expand All @@ -23,14 +27,17 @@
from landscape.client.reactor import LandscapeReactor
from landscape.client.serviceconfig import ServiceConfig
from landscape.client.serviceconfig import ServiceConfigException
from landscape.client.snap_utils import get_snap_info
from landscape.lib import base64
from landscape.lib.amp import MethodCallError
from landscape.lib.bootstrap import BootstrapDirectory
from landscape.lib.bootstrap import BootstrapList
from landscape.lib.compat import input
from landscape.lib.fetch import fetch
from landscape.lib.fetch import FetchError
from landscape.lib.format import expandvars
from landscape.lib.fs import create_binary_file
from landscape.lib.network import get_active_device_info
from landscape.lib.network import get_fqdn
from landscape.lib.persist import Persist
from landscape.lib.tag import is_valid_tag
Expand Down Expand Up @@ -601,6 +608,54 @@ def decode_base64_ssl_public_certificate(config):
config.ssl_public_key = store_public_key_data(config, decoded_cert)


def generate_computer_title(auto_enroll_config):
"""Generate the computer title.
This follows the LA017 specification and falls back to `hostname`
if generating the title fails due to missing data.
"""
snap_info = get_snap_info()
wait_for_serial = auto_enroll_config.get("wait-for-serial-as", True)
if "serial" not in snap_info and wait_for_serial:
return

hostname = get_fqdn()
wait_for_hostname = auto_enroll_config.get("wait-for-hostname", False)
if hostname == "localhost" and wait_for_hostname:
return

(nic,) = get_active_device_info(default_only=True)[0:1] or [{}]

lshw = subprocess.run(
["lshw", "-json", "-quiet", "-c", "system"],
capture_output=True,
text=True,
)
(hardware,) = json.loads(lshw.stdout)[0:1] or [{}]

computer_title_pattern = auto_enroll_config.get(
"computer-title-pattern",
"${hostname}",
)
title = expandvars(
computer_title_pattern,
serial=snap_info.get("serial", ""),
model=snap_info.get("model", ""),
brand=snap_info.get("brand", ""),
hostname=hostname,
ip=nic.get("ip_address", ""),
mac=nic.get("mac_address", ""),
prodiden=hardware.get("product", ""),
serialno=hardware.get("serial", ""),
datetime=datetime.now(timezone.utc),
)

if title == "": # on the off-chance the title is empty
title = hostname

return title


def setup(config):
"""
Perform steps to ensure that landscape-client is correctly configured
Expand Down
22 changes: 22 additions & 0 deletions landscape/client/deployment.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

from landscape import VERSION
from landscape.client import DEFAULT_CONFIG
from landscape.client import snap_http
from landscape.client.upgraders import UPGRADE_MANAGERS
from landscape.lib import logging
from landscape.lib.config import BaseConfiguration as _BaseConfiguration
Expand Down Expand Up @@ -189,6 +190,27 @@ def juju_filename(self):
backwards-compatibility."""
return os.path.join(self.data_path, "juju-info.json")

def auto_configure(self):
"""Automatically configure the client snap."""
from landscape.client.configuration import generate_computer_title

client_conf = snap_http.get_conf("landscape-client").result
auto_enroll_conf = client_conf.get("auto-register", {})

enabled = auto_enroll_conf.get("enabled", False)
configured = auto_enroll_conf.get("configured", False)
if not enabled or configured:
return

title = generate_computer_title(auto_enroll_conf)
if title:
self.computer_title = title
self.write()

auto_enroll_conf["configured"] = True
client_conf["auto-register"] = auto_enroll_conf
snap_http.set_conf("landscape-client", client_conf)


def get_versioned_persist(service):
"""Get a L{Persist} database with upgrade rules applied.
Expand Down
13 changes: 6 additions & 7 deletions landscape/client/monitor/computerinfo.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from twisted.internet.defer import returnValue

from landscape.client.monitor.plugin import MonitorPlugin
from landscape.client.snap_utils import get_assertions
from landscape.client.snap_utils import get_snap_info
from landscape.lib.cloud import fetch_ec2_meta_data
from landscape.lib.fetch import fetch_async
from landscape.lib.fs import read_text_file
Expand Down Expand Up @@ -221,10 +221,9 @@ def log_success(result):
def _create_snap_info_message(self):
"""Create message with the snapd serial metadata."""
message = {}
assertions = get_assertions("serial")
if assertions:
assertion = assertions[0]
self._add_if_new(message, "brand", assertion["brand-id"])
self._add_if_new(message, "model", assertion["model"])
self._add_if_new(message, "serial", assertion["serial"])
snap_info = get_snap_info()
if snap_info:
self._add_if_new(message, "brand", snap_info["brand"])
self._add_if_new(message, "model", snap_info["model"])
self._add_if_new(message, "serial", snap_info["serial"])
return message
4 changes: 2 additions & 2 deletions landscape/client/monitor/tests/test_computerinfo.py
Original file line number Diff line number Diff line change
Expand Up @@ -560,7 +560,7 @@ def test_fetch_ec2_meta_data_bad_result_retry(self):
result,
)

@mock.patch("landscape.client.monitor.computerinfo.get_assertions")
@mock.patch("landscape.client.snap_utils.get_assertions")
def test_snap_info(self, mock_get_assertions):
"""Test getting the snap info message."""
mock_get_assertions.return_value = [
Expand All @@ -586,7 +586,7 @@ def test_snap_info(self, mock_get_assertions):
"03961d5d-26e5-443f-838d-6db046126bea",
)

@mock.patch("landscape.client.monitor.computerinfo.get_assertions")
@mock.patch("landscape.client.snap_utils.get_assertions")
def test_snap_info_no_results(self, mock_get_assertions):
"""Test getting the snap info message when there are no results.
Expand Down
13 changes: 13 additions & 0 deletions landscape/client/snap_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,16 @@ def get_assertions(assertion_type: str):
assertions.append(assertion)

return assertions


def get_snap_info():
"""Get the snap device information."""
info = {}

serial_as = get_assertions("serial")
if serial_as:
info["serial"] = serial_as[0]["serial"]
info["model"] = serial_as[0]["model"]
info["brand"] = serial_as[0]["brand-id"]

return info
172 changes: 172 additions & 0 deletions landscape/client/tests/test_configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from landscape.client.configuration import exchange_failure
from landscape.client.configuration import EXIT_NOT_REGISTERED
from landscape.client.configuration import failure
from landscape.client.configuration import generate_computer_title
from landscape.client.configuration import got_connection
from landscape.client.configuration import got_error
from landscape.client.configuration import handle_registration_errors
Expand Down Expand Up @@ -2760,3 +2761,174 @@ def test_function(self, Identity, Persist):
Persist().save.assert_called_once_with()
Identity.assert_called_once_with(config, Persist())
self.assertEqual(Identity().secure_id, "fancysecureid")


class GenerateComputerTitleTest(unittest.TestCase):
"""Tests for the `generate_computer_title` function."""

@mock.patch("landscape.client.configuration.subprocess")
@mock.patch("landscape.client.configuration.get_active_device_info")
@mock.patch("landscape.client.configuration.get_fqdn")
@mock.patch("landscape.client.configuration.get_snap_info")
def test_generate_computer_title(
self,
mock_snap_info,
mock_fqdn,
mock_active_device_info,
mock_subprocess,
):
"""Returns a computer title matching `computer-title-pattern`."""
mock_snap_info.return_value = {
"serial": "f315cab5-ba74-4d3c-be85-713406455773",
"model": "generic-classic",
"brand": "generic",
}
mock_fqdn.return_value = "terra"
mock_active_device_info.return_value = [
{
"interface": "wlp108s0",
"ip_address": "192.168.0.104",
"mac_address": "5c:80:b6:99:42:8d",
"broadcast_address": "192.168.0.255",
"netmask": "255.255.255.0",
},
]
mock_subprocess.run.return_value.stdout = """
[{
"id" : "terra",
"class" : "system",
"claimed" : true,
"handle" : "DMI:0002",
"description" : "Convertible",
"product" : "HP EliteBook x360 1030 G4 (8TK37UC#ABA)",
"vendor" : "HP",
"serial" : "ABCDE"
}]
"""

title = generate_computer_title(
{
"enabled": True,
"configured": False,
"computer-title-pattern": "${model:8:7}-${serial:0:8}",
"wait-for-serial-as": True,
"wait-for-hostname": True,
},
)
self.assertEqual(title, "classic-f315cab5")

@mock.patch("landscape.client.configuration.get_snap_info")
def test_generate_computer_title_wait_for_serial_no_serial_assertion(
self,
mock_snap_info,
):
"""Returns `None`."""
mock_snap_info.return_value = {}

title = generate_computer_title(
{
"enabled": True,
"configured": False,
"computer-title-pattern": "${model:8:7}-${serial:0:8}",
"wait-for-serial-as": True,
"wait-for-hostname": True,
},
)
self.assertIsNone(title)

@mock.patch("landscape.client.configuration.get_fqdn")
@mock.patch("landscape.client.configuration.get_snap_info")
def test_generate_computer_title_wait_for_hostname(
self,
mock_snap_info,
mock_fqdn,
):
"""Returns `None`."""
mock_snap_info.return_value = {
"serial": "f315cab5-ba74-4d3c-be85-713406455773",
"model": "generic-classic",
"brand": "generic",
}
mock_fqdn.return_value = "localhost"

title = generate_computer_title(
{
"enabled": True,
"configured": False,
"computer-title-pattern": "${model:8:7}-${serial:0:8}",
"wait-for-serial-as": True,
"wait-for-hostname": True,
},
)
self.assertIsNone(title)

@mock.patch("landscape.client.configuration.subprocess")
@mock.patch("landscape.client.configuration.get_active_device_info")
@mock.patch("landscape.client.configuration.get_fqdn")
@mock.patch("landscape.client.configuration.get_snap_info")
def test_generate_computer_title_no_nic(
self,
mock_snap_info,
mock_fqdn,
mock_active_device_info,
mock_subprocess,
):
"""Returns a title (almost) matching `computer-title-pattern`."""
mock_snap_info.return_value = {
"serial": "f315cab5-ba74-4d3c-be85-713406455773",
"model": "generic-classic",
"brand": "generic",
}
mock_fqdn.return_value = "terra"
mock_active_device_info.return_value = []
mock_subprocess.run.return_value.stdout = """
[{
"id" : "terra",
"class" : "system",
"claimed" : true,
"handle" : "DMI:0002",
"description" : "Convertible",
"product" : "HP EliteBook x360 1030 G4 (8TK37UC#ABA)",
"vendor" : "HP",
"serial" : "ABCDE"
}]
"""

title = generate_computer_title(
{
"enabled": True,
"configured": False,
"computer-title-pattern": "${hostname}-${ip}",
"wait-for-serial-as": True,
"wait-for-hostname": True,
},
)
self.assertEqual(title, "terra-")

@mock.patch("landscape.client.configuration.subprocess")
@mock.patch("landscape.client.configuration.get_active_device_info")
@mock.patch("landscape.client.configuration.get_fqdn")
@mock.patch("landscape.client.configuration.get_snap_info")
def test_generate_computer_title_with_missing_data(
self,
mock_snap_info,
mock_fqdn,
mock_active_device_info,
mock_subprocess,
):
"""Returns the default title `hostname`."""
mock_snap_info.return_value = {}
mock_fqdn.return_value = "localhost"
mock_active_device_info.return_value = []
mock_subprocess.run.return_value.stdout = "[]"

title = generate_computer_title(
{
"enabled": True,
"configured": False,
"computer-title-pattern": "${mac}${serialno}",
"wait-for-serial-as": False,
"wait-for-hostname": False,
},
)
self.assertEqual(title, "localhost")
Loading

0 comments on commit cd05b56

Please sign in to comment.