Skip to content

Commit

Permalink
refactor: use synchronous message exchange for landscape-config (#262)
Browse files Browse the repository at this point in the history
  • Loading branch information
Perfect5th authored Aug 26, 2024
1 parent d53ec6f commit ec5ce26
Show file tree
Hide file tree
Showing 14 changed files with 697 additions and 1,017 deletions.
6 changes: 3 additions & 3 deletions landscape/client/broker/tests/test_transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ def request_with_payload(self, payload):
payload,
computer_id="34",
exchange_token="abcd-efgh",
message_api="X.Y",
message_api=b"X.Y",
)

def got_result(ignored):
Expand Down Expand Up @@ -141,7 +141,7 @@ def test_ssl_verification_positive(self):
transport.exchange,
"HI",
computer_id="34",
message_api="X.Y",
message_api=b"X.Y",
)

def got_result(ignored):
Expand Down Expand Up @@ -190,7 +190,7 @@ def test_ssl_verification_negative(self):
transport.exchange,
"HI",
computer_id="34",
message_api="X.Y",
message_api=b"X.Y",
)

def got_result(ignored):
Expand Down
111 changes: 23 additions & 88 deletions landscape/client/broker/transport.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,12 @@
"""Low-level server communication."""
import logging
import pprint
import time
from dataclasses import asdict
import uuid

import pycurl
from typing import Optional
from typing import Union

from landscape import SERVER_API
from landscape import VERSION
from landscape.lib import bpickle
from landscape.lib.compat import _PY3
from landscape.client.exchange import exchange_messages
from landscape.lib.compat import unicode
from landscape.lib.fetch import fetch
from landscape.lib.format import format_delta


class HTTPTransport:
Expand All @@ -35,96 +29,37 @@ def set_url(self, url):
"""Set the URL of the remote message system."""
self._url = url

def _curl(self, payload, computer_id, exchange_token, message_api):
# There are a few "if _PY3" checks below, because for Python 3 we
# want to convert a number of values from bytes to string, before
# assigning them to the headers.
if _PY3 and isinstance(message_api, bytes):
message_api = message_api.decode("ascii")
headers = {
"X-Message-API": message_api,
"User-Agent": f"landscape-client/{VERSION}",
"Content-Type": "application/octet-stream",
}
if computer_id:
if _PY3 and isinstance(computer_id, bytes):
computer_id = computer_id.decode("ascii")
headers["X-Computer-ID"] = computer_id
if exchange_token:
if _PY3 and isinstance(exchange_token, bytes):
exchange_token = exchange_token.decode("ascii")
headers["X-Exchange-Token"] = str(exchange_token)
curl = pycurl.Curl()
return (
curl,
fetch(
self._url,
post=True,
data=payload,
headers=headers,
cainfo=self._pubkey,
curl=curl,
),
)

def exchange(
self,
payload,
computer_id=None,
exchange_token=None,
message_api=SERVER_API,
):
payload: dict,
computer_id: Optional[str] = None,
exchange_token: Optional[str] = None,
message_api: bytes = SERVER_API,
) -> Union[dict, None]:
"""Exchange message data with the server.
@param payload: The object to send, it must be L{bpickle}-compatible.
@param computer_id: The computer ID to send the message as (see
also L{Identity}).
@param exchange_token: The token that the server has given us at the
last exchange. It's used to prove that we are still the same
client.
:param payload: The object to send. It must be `bpickle`-compatible.
:param computer_id: The computer ID to send the message as.
:param exchange_token: Token included in the exchange to prove client
@type: C{dict}
@return: The server's response to sent message or C{None} in case
of error.
@note: This code is thread safe (HOPEFULLY).
:return: The server's response to the sent message or `None` if there
was an error.
:note: This code is thread safe (HOPEFULLY).
"""
spayload = bpickle.dumps(payload)
start_time = time.time()
if logging.getLogger().getEffectiveLevel() <= logging.DEBUG:
logging.debug("Sending payload:\n%s", pprint.pformat(payload))
try:
curly, data = self._curl(
spayload,
computer_id,
exchange_token,
message_api,
)
except Exception:
logging.exception(f"Error contacting the server at {self._url}.")
raise
else:
logging.info(
"Sent %d bytes and received %d bytes in %s.",
len(spayload),
len(data),
format_delta(time.time() - start_time),
response = exchange_messages(
payload,
self._url,
cainfo=self._pubkey,
computer_id=computer_id,
exchange_token=exchange_token,
server_api=message_api.decode(),
)

try:
response = bpickle.loads(data)
except Exception:
logging.exception(f"Server returned invalid data: {data!r}")
return None
else:
if logging.getLogger().getEffectiveLevel() <= logging.DEBUG:
logging.debug(
"Received payload:\n%s",
pprint.pformat(response),
)

return response
return asdict(response)


class FakeTransport:
Expand Down
Loading

0 comments on commit ec5ce26

Please sign in to comment.