Skip to content

Commit

Permalink
Merge branch 'dsuch-f-gh755-multi-cpu'
Browse files Browse the repository at this point in the history
  • Loading branch information
dsuch committed Aug 24, 2023
2 parents ca69886 + 9d33c92 commit e62d0a8
Show file tree
Hide file tree
Showing 40 changed files with 733 additions and 475 deletions.
44 changes: 36 additions & 8 deletions code/zato-client/src/zato/client/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,12 @@

mod_logger = logging.getLogger(__name__)

# ################################################################################################################################
# ################################################################################################################################

if 0:
from zato.common.typing_ import any_, strtuple

# ################################################################################################################################
# Version
# ################################################################################################################################
Expand Down Expand Up @@ -323,6 +329,11 @@ def __init__(self, address, path, auth=None, session=None, to_bunch=False,
max_response_repr=DEFAULT_MAX_RESPONSE_REPR, max_cid_repr=DEFAULT_MAX_CID_REPR, logger=None,
tls_verify=True):
self.address = address

self.auth = auth # type: strtuple
self.username = auth[0] # type: str
self.password = auth[1] # type: str

self.service_address = '{}{}'.format(address, path)
self.session = session or requests.session()

Expand Down Expand Up @@ -425,10 +436,10 @@ def _invoke(self, name=None, payload='', headers=None, channel='invoke', data_fo
return super(AnyServiceInvoker, self).invoke(dumps(request, default=default_json_handler),
ServiceInvokeResponse, is_async, headers, output_repeated)

def invoke(self, *args, **kwargs):
def invoke(self, *args:'any_', **kwargs:'any_') -> 'any_':
return self._invoke(is_async=False, *args, **kwargs)

def invoke_async(self, *args, **kwargs):
def invoke_async(self, *args:'any_', **kwargs:'any_') -> 'any_':
return self._invoke(is_async=True, *args, **kwargs)

# ################################################################################################################################
Expand All @@ -455,7 +466,15 @@ def get_client_from_credentials(server_url:'str', client_auth:'tuple') -> 'ZatoC

# ################################################################################################################################

def get_client_from_server_conf(server_dir, client_auth_func, get_config_func, server_url=None, stdin_data=None):
def get_client_from_server_conf(
server_dir,
client_auth_func,
get_config_func,
server_url=None,
stdin_data=None,
*,
url_path=None,
):
""" Returns a Zato client built out of data found in a given server's config files.
"""
# stdlib
Expand All @@ -473,16 +492,25 @@ def get_client_from_server_conf(server_dir, client_auth_func, get_config_func, s

secrets_config = ConfigObj(os.path.join(repo_location, 'secrets.conf'), use_zato=False)
secrets_conf = get_config_func(
repo_location, 'secrets.conf', needs_user_config=False,
crypto_manager=crypto_manager, secrets_conf=secrets_config)

config = get_config_func(repo_location, 'server.conf', crypto_manager=crypto_manager, secrets_conf=secrets_conf)
repo_location,
'secrets.conf',
needs_user_config=False,
crypto_manager=crypto_manager,
secrets_conf=secrets_config
)

config = get_config_func(
repo_location,
'server.conf',
crypto_manager=crypto_manager,
secrets_conf=secrets_conf,
)

# Note that we cannot use 0.0.0.0 under Windows but, since it implies localhost, we can just replace it as below.
server_url = server_url if server_url else config.main.gunicorn_bind # type: str
server_url = server_url.replace('0.0.0.0', '127.0.0.1')

client_auth = client_auth_func(config, repo_location, crypto_manager, False)
client_auth = client_auth_func(config, repo_location, crypto_manager, False, url_path=url_path)

client = ZatoClient('http://{}'.format(server_url), '/zato/admin/invoke', client_auth, max_response_repr=15000)
session = get_odb_session_from_server_config(config, None, False)
Expand Down
2 changes: 2 additions & 0 deletions code/zato-common/src/zato/common/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -1126,6 +1126,8 @@ class ContentType:

class IPC:

Status_OK = 'ok'

class Default:
Timeout = 90

Expand Down
11 changes: 3 additions & 8 deletions code/zato-common/src/zato/common/aux_server/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from gevent.pywsgi import WSGIServer

# Zato
from zato.common.api import ZATO_ODB_POOL_NAME
from zato.common.api import IPC as Common_IPC, ZATO_ODB_POOL_NAME
from zato.common.broker_message import code_to_name
from zato.common.crypto.api import CryptoManager, is_string_equal
from zato.common.odb.api import ODBManager, PoolStore
Expand Down Expand Up @@ -342,7 +342,7 @@ def __call__(self, env:'anydict', start_response:'callable_') -> 'byteslist':
response = self.handle_api_request(request)

# If we are here, it means that there was no exception
status_text = 'ok'
status_text = Common_IPC.Status_OK
status_code = StatusCode.OK

except Exception:
Expand All @@ -363,12 +363,7 @@ def __call__(self, env:'anydict', start_response:'callable_') -> 'byteslist':
}

# .. make sure that we return bytes representing a JSON object ..
try:
return_data = dumps(return_data)
except TypeError as e:
e
e
return_data = '{}'
return_data = dumps(return_data)
return_data = return_data.encode('utf8')

start_response(status_code, headers)
Expand Down
28 changes: 24 additions & 4 deletions code/zato-common/src/zato/common/component_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,15 @@
from zato.common.util.api import current_host
from zato.common.util.open_ import open_r

# ################################################################################################################################
# ################################################################################################################################

if 0:
from zato.common.typing_ import stranydict

# ################################################################################################################################
# ################################################################################################################################

def format_connections(conns, format):
""" Formats a list of connections according to the output format.
"""
Expand Down Expand Up @@ -72,8 +81,12 @@ def get_worker_pids(component_path):
master_proc_pid = int(open_r(os.path.join(component_path, MISC.PIDFILE)).read())
return sorted(elem.pid for elem in Process(master_proc_pid).children())

def get_info(component_path, format, _now=datetime.utcnow):
component_details = open_r(os.path.join(component_path, ZATO_INFO_FILE)).read()
def get_info(component_path, format, _now=datetime.utcnow) -> 'stranydict':

master_proc_pid_file = None

component_details_file = open_r(os.path.join(component_path, ZATO_INFO_FILE))
component_details = component_details_file.read()

out = {
'component_details': component_details,
Expand All @@ -94,10 +107,14 @@ def get_info(component_path, format, _now=datetime.utcnow):

master_proc_pid = None
try:
master_proc_pid = int(open_r(os.path.join(component_path, MISC.PIDFILE)).read())
master_proc_pid_file = open_r(os.path.join(component_path, MISC.PIDFILE))
master_proc_pid = int(master_proc_pid_file.read())
except(IOError, ValueError):
# Ok, no such file or it's empty
# Ok, no such file or it is empty
pass
finally:
if master_proc_pid_file:
master_proc_pid_file.close()

if master_proc_pid:
out['component_running'] = True
Expand Down Expand Up @@ -133,6 +150,9 @@ def get_info(component_path, format, _now=datetime.utcnow):
out['worker_{}_create_time_utc'.format(pid)] = worker_create_time_utc.isoformat()
out['worker_{}_connections'.format(pid)] = format_connections(worker.connections(), format)

# Final cleanup
component_details_file.close()

return out

def format_info(value, format, cols_width=None, dumper=None):
Expand Down
16 changes: 12 additions & 4 deletions code/zato-common/src/zato/common/ipc/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@
# ################################################################################################################################

if 0:
from zato.common.typing_ import anydict, callable_
from zato.common.ipc.client import IPCResponse
from zato.common.typing_ import callable_

# ################################################################################################################################
# ################################################################################################################################
Expand Down Expand Up @@ -86,7 +87,7 @@ def invoke_by_pid(
server_name, # type: str
target_pid, # type: int
timeout=90 # type: int
) -> 'anydict':
) -> 'IPCResponse':
""" Invokes a service in a specific process synchronously through IPC.
"""

Expand All @@ -105,8 +106,15 @@ def invoke_by_pid(
url_path = fs_safe_name(url_path)

client = IPCClient(ipc_host, ipc_port, IPC.Credentials.Username, self.password)
response = client.invoke(service, request, url_path, timeout=timeout)
response
response = client.invoke(
service,
request,
url_path,
cluster_name=cluster_name,
server_name=server_name,
server_pid=target_pid,
timeout=timeout
)
return response

# ################################################################################################################################
Expand Down
53 changes: 49 additions & 4 deletions code/zato-common/src/zato/common/ipc/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,39 @@
from requests import post as requests_post

# Zato
from zato.common.api import IPC as Common_IPC
from zato.common.broker_message import SERVER_IPC
from zato.common.typing_ import dataclass

# ################################################################################################################################
# ################################################################################################################################

if 0:
from zato.common.typing_ import any_, anydict
from zato.common.typing_ import any_, anydict, anylist

# ################################################################################################################################
# ################################################################################################################################

@dataclass(init=False)
class IPCResponseMeta:

cid: 'str'
is_ok: 'bool'

service: 'str'
request: 'any_'

cluster_name: 'str'
server_name: 'str'
server_pid: 'int'

# ################################################################################################################################
# ################################################################################################################################

@dataclass(init=False)
class IPCResponse:
data: 'anydict | anylist | None'
meta: 'IPCResponseMeta'

# ################################################################################################################################
# ################################################################################################################################
Expand All @@ -41,7 +67,17 @@ def __init__(

# ################################################################################################################################

def invoke(self, service:'str', data:'any_', url_path:'str', timeout:'int'=90) -> 'anydict':
def invoke(
self,
service, # type: str
request, # type: any_
url_path, # type: str
*,
cluster_name, # type: str
server_name, # type: str
server_pid, # type: int
timeout=90, # type: int
) -> 'IPCResponse':

# This is where we can find the IPC server to invoke ..
url = f'http://{self.host}:{self.port}/{url_path}'
Expand All @@ -52,7 +88,7 @@ def invoke(self, service:'str', data:'any_', url_path:'str', timeout:'int'=90) -
'username': self.username,
'password': self.password,
'service': service,
'data': data,
'data': request,
})

# .. invoke the server ..
Expand All @@ -61,8 +97,17 @@ def invoke(self, service:'str', data:'any_', url_path:'str', timeout:'int'=90) -
# .. de-serialize the response ..
response = loads(response.text)

ipc_response = IPCResponse()
ipc_response.data = response['response'] or None
ipc_response.meta = IPCResponseMeta()
ipc_response.meta.cid = response['cid']
ipc_response.meta.is_ok = response['status'] == Common_IPC.Status_OK
ipc_response.meta.cluster_name = cluster_name
ipc_response.meta.server_name = server_name
ipc_response.meta.server_pid = server_pid

# .. and return its response.
return response
return ipc_response

# ################################################################################################################################

Expand Down
11 changes: 10 additions & 1 deletion code/zato-common/src/zato/common/test/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# -*- coding: utf-8 -*-

"""
Copyright (C) 2022, Zato Source s.r.o. https://zato.io
Copyright (C) 2023, Zato Source s.r.o. https://zato.io
Licensed under LGPLv3, see LICENSE.txt for terms and conditions.
"""
Expand All @@ -10,6 +10,7 @@
from datetime import datetime
from logging import getLogger
from tempfile import NamedTemporaryFile
from time import sleep
from random import choice, randint
from unittest import TestCase
from uuid import uuid4
Expand Down Expand Up @@ -257,6 +258,9 @@ def __init__(
def decrypt(self, data:'str') -> 'str':
return data

def invoke_all_pids(*args, **kwargs):
return []

# ################################################################################################################################
# ################################################################################################################################

Expand Down Expand Up @@ -732,6 +736,8 @@ def create_pubsub_topic(
# These parameters for the Command to invoke will always exist ..
cli_params = ['pubsub', 'create-topic', '--name', topic_name]

self.logger.info(f'Creating topic {topic_name} ({self.__class__.__name__})')

# .. whereas these ones are optional ..
if limit_retention:
cli_params.append('--limit-retention')
Expand Down Expand Up @@ -794,6 +800,9 @@ def run_zato_cli_command(
# .. invoke the service and obtain its response ..
out = invoker.invoke_cli(cli_params, command_name) # type: str

# .. let the changes propagate across servers ..
sleep(1)

# .. and let the parent class handle the result
return self._handle_cli_out(out, assert_ok, load_json)

Expand Down
5 changes: 4 additions & 1 deletion code/zato-common/src/zato/common/test/rest_client.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# -*- coding: utf-8 -*-

"""
Copyright (C) 2021, Zato Source s.r.o. https://zato.io
Copyright (C) 2023, Zato Source s.r.o. https://zato.io
Licensed under LGPLv3, see LICENSE.txt for terms and conditions.
"""
Expand All @@ -10,6 +10,7 @@
import logging
from http.client import OK
from json import dumps, loads
from time import sleep

# Bunch
from bunch import Bunch, bunchify
Expand Down Expand Up @@ -133,6 +134,8 @@ def init(self, /, username:'str'='', sec_name:'str'='') -> 'None':
command('service', 'invoke', TestConfig.server_location,
'zato.security.basic-auth.change-password', '--payload', payload)

sleep(4)

# .. and store the credentials for later use.
self._auth = (self._api_invoke_username, self._api_invoke_password)

Expand Down
Loading

0 comments on commit e62d0a8

Please sign in to comment.