From ce32c0684311923fb80dd04221d5fd5120170cf9 Mon Sep 17 00:00:00 2001 From: Zach Koopmans Date: Mon, 2 Dec 2019 22:51:55 -0800 Subject: Import benchmark-tools to main repository. This has adapted for use with bazel from the original commit a26e93769ebefd82593a43e22fb13a09717cfa6d. In particular, the style has been made consistent with internal python style guidelines, and the packages (including the main entrypoint) have been refactored in order to allow bazel testing targets. PiperOrigin-RevId: 283484433 --- benchmarks/harness/BUILD | 89 ++++++++++ benchmarks/harness/__init__.py | 25 +++ benchmarks/harness/benchmark_driver.py | 85 +++++++++ benchmarks/harness/container.py | 181 +++++++++++++++++++ benchmarks/harness/machine.py | 191 +++++++++++++++++++++ benchmarks/harness/machine_mocks/BUILD | 9 + benchmarks/harness/machine_mocks/__init__.py | 81 +++++++++ benchmarks/harness/machine_producers/BUILD | 35 ++++ benchmarks/harness/machine_producers/__init__.py | 13 ++ .../harness/machine_producers/machine_producer.py | 30 ++++ .../harness/machine_producers/mock_producer.py | 31 ++++ .../harness/machine_producers/yaml_producer.py | 106 ++++++++++++ benchmarks/harness/ssh_connection.py | 111 ++++++++++++ benchmarks/harness/tunnel_dispatcher.py | 82 +++++++++ 14 files changed, 1069 insertions(+) create mode 100644 benchmarks/harness/BUILD create mode 100644 benchmarks/harness/__init__.py create mode 100644 benchmarks/harness/benchmark_driver.py create mode 100644 benchmarks/harness/container.py create mode 100644 benchmarks/harness/machine.py create mode 100644 benchmarks/harness/machine_mocks/BUILD create mode 100644 benchmarks/harness/machine_mocks/__init__.py create mode 100644 benchmarks/harness/machine_producers/BUILD create mode 100644 benchmarks/harness/machine_producers/__init__.py create mode 100644 benchmarks/harness/machine_producers/machine_producer.py create mode 100644 benchmarks/harness/machine_producers/mock_producer.py create mode 100644 benchmarks/harness/machine_producers/yaml_producer.py create mode 100644 benchmarks/harness/ssh_connection.py create mode 100644 benchmarks/harness/tunnel_dispatcher.py (limited to 'benchmarks/harness') diff --git a/benchmarks/harness/BUILD b/benchmarks/harness/BUILD new file mode 100644 index 000000000..9546220c4 --- /dev/null +++ b/benchmarks/harness/BUILD @@ -0,0 +1,89 @@ +load("//benchmarks:defs.bzl", "py_library", "requirement") + +package( + default_visibility = ["//benchmarks:__subpackages__"], + licenses = ["notice"], +) + +py_library( + name = "harness", + srcs = ["__init__.py"], +) + +py_library( + name = "benchmark_driver", + srcs = ["benchmark_driver.py"], + deps = [ + "//benchmarks/harness/machine_mocks", + "//benchmarks/harness/machine_producers:machine_producer", + "//benchmarks/suites", + ], +) + +py_library( + name = "container", + srcs = ["container.py"], + deps = [ + requirement("asn1crypto", False), + requirement("chardet", False), + requirement("certifi", False), + requirement("docker", True), + requirement("docker-pycreds", False), + requirement("idna", False), + requirement("ptyprocess", False), + requirement("requests", False), + requirement("urllib3", False), + requirement("websocket-client", False), + ], +) + +py_library( + name = "machine", + srcs = ["machine.py"], + deps = [ + "//benchmarks/harness", + "//benchmarks/harness:container", + "//benchmarks/harness:ssh_connection", + "//benchmarks/harness:tunnel_dispatcher", + requirement("asn1crypto", False), + requirement("chardet", False), + requirement("certifi", False), + requirement("docker", True), + requirement("docker-pycreds", False), + requirement("idna", False), + requirement("ptyprocess", False), + requirement("requests", False), + requirement("urllib3", False), + requirement("websocket-client", False), + ], +) + +py_library( + name = "ssh_connection", + srcs = ["ssh_connection.py"], + deps = [ + "//benchmarks/harness", + requirement("bcrypt", False), + requirement("cffi", False), + requirement("paramiko", True), + requirement("cryptography", False), + ], +) + +py_library( + name = "tunnel_dispatcher", + srcs = ["tunnel_dispatcher.py"], + deps = [ + requirement("asn1crypto", False), + requirement("chardet", False), + requirement("certifi", False), + requirement("docker", True), + requirement("docker-pycreds", False), + requirement("idna", False), + requirement("pexpect", True), + requirement("ptyprocess", False), + requirement("requests", False), + requirement("urllib3", False), + requirement("websocket-client", False), + ], +) diff --git a/benchmarks/harness/__init__.py b/benchmarks/harness/__init__.py new file mode 100644 index 000000000..a7f34da9e --- /dev/null +++ b/benchmarks/harness/__init__.py @@ -0,0 +1,25 @@ +# python3 +# Copyright 2019 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Core benchmark utilities.""" + +import os + +# LOCAL_WORKLOADS_PATH defines the path to use for local workloads. This is a +# format string that accepts a single string parameter. +LOCAL_WORKLOADS_PATH = os.path.join( + os.path.dirname(__file__), "../workloads/{}") + +# REMOTE_WORKLOADS_PATH defines the path to use for storing the workloads on the +# remote host. This is a format string that accepts a single string parameter. +REMOTE_WORKLOADS_PATH = "workloads/{}" diff --git a/benchmarks/harness/benchmark_driver.py b/benchmarks/harness/benchmark_driver.py new file mode 100644 index 000000000..9abc21b54 --- /dev/null +++ b/benchmarks/harness/benchmark_driver.py @@ -0,0 +1,85 @@ +# python3 +# Copyright 2019 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Main driver for benchmarks.""" + +import copy +import statistics +import threading +import types + +from benchmarks import suites +from benchmarks.harness.machine_producers import machine_producer + + +# pylint: disable=too-many-instance-attributes +class BenchmarkDriver: + """Allocates machines and invokes a benchmark method.""" + + def __init__(self, + producer: machine_producer.MachineProducer, + method: types.FunctionType, + runs: int = 1, + **kwargs): + + self._producer = producer + self._method = method + self._kwargs = copy.deepcopy(kwargs) + self._threads = [] + self.lock = threading.RLock() + self._runs = runs + self._metric_results = {} + + def start(self): + """Starts a benchmark thread.""" + for _ in range(self._runs): + thread = threading.Thread(target=self._run_method) + thread.start() + self._threads.append(thread) + + def join(self): + """Joins the thread.""" + # pylint: disable=expression-not-assigned + [t.join() for t in self._threads] + + def _run_method(self): + """Runs all benchmarks.""" + machines = self._producer.get_machines( + suites.benchmark_machines(self._method)) + try: + result = self._method(*machines, **self._kwargs) + for name, res in result: + with self.lock: + if name in self._metric_results: + self._metric_results[name].append(res) + else: + self._metric_results[name] = [res] + finally: + # Always release. + self._producer.release_machines(machines) + + def median(self): + """Returns the median result, after join is finished.""" + for key, value in self._metric_results.items(): + yield key, [statistics.median(value)] + + def all(self): + """Returns all results.""" + for key, value in self._metric_results.items(): + yield key, value + + def meanstd(self): + """Returns all results.""" + for key, value in self._metric_results.items(): + mean = statistics.mean(value) + yield key, [mean, statistics.stdev(value, xbar=mean)] diff --git a/benchmarks/harness/container.py b/benchmarks/harness/container.py new file mode 100644 index 000000000..585436e20 --- /dev/null +++ b/benchmarks/harness/container.py @@ -0,0 +1,181 @@ +# python3 +# Copyright 2019 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Container definitions.""" + +import contextlib +import logging +import pydoc +import types +from typing import Tuple + +import docker +import docker.errors + +from benchmarks import workloads + + +class Container: + """Abstract container. + + Must be a context manager. + + Usage: + + with Container(client, image, ...): + ... + """ + + def run(self, **env) -> str: + """Run the container synchronously.""" + raise NotImplementedError + + def detach(self, **env): + """Run the container asynchronously.""" + raise NotImplementedError + + def address(self) -> Tuple[str, int]: + """Return the bound address for the container.""" + raise NotImplementedError + + def get_names(self) -> types.GeneratorType: + """Return names of all containers.""" + raise NotImplementedError + + +# pylint: disable=too-many-instance-attributes +class DockerContainer(Container): + """Class that handles creating a docker container.""" + + # pylint: disable=too-many-arguments + def __init__(self, + client: docker.DockerClient, + host: str, + image: str, + count: int = 1, + runtime: str = "runc", + port: int = 0, + **kwargs): + """Trys to setup "count" containers. + + Args: + client: A docker client from dockerpy. + host: The host address the image is running on. + image: The name of the image to run. + count: The number of containers to setup. + runtime: The container runtime to use. + port: The port to reserve. + **kwargs: Additional container options. + """ + assert count >= 1 + assert port == 0 or count == 1 + self._client = client + self._host = host + self._containers = [] + self._count = count + self._image = image + self._runtime = runtime + self._port = port + self._kwargs = kwargs + if port != 0: + self._ports = {"%d/tcp" % port: None} + else: + self._ports = {} + + @contextlib.contextmanager + def detach(self, **env): + env = ["%s=%s" % (key, value) for (key, value) in env.items()] + # Start all containers. + for _ in range(self._count): + try: + # Start the container in a detached mode. + container = self._client.containers.run( + self._image, + detach=True, + remove=True, + runtime=self._runtime, + ports=self._ports, + environment=env, + **self._kwargs) + logging.info("Started detached container %s -> %s", self._image, + container.attrs["Id"]) + self._containers.append(container) + except Exception as exc: + self._clean_containers() + raise exc + try: + # Wait for all containers to be up. + for container in self._containers: + while not container.attrs["State"]["Running"]: + container = self._client.containers.get(container.attrs["Id"]) + yield self + finally: + self._clean_containers() + + def address(self) -> Tuple[str, int]: + assert self._count == 1 + assert self._port != 0 + container = self._client.containers.get(self._containers[0].attrs["Id"]) + port = container.attrs["NetworkSettings"]["Ports"][ + "%d/tcp" % self._port][0]["HostPort"] + return (self._host, port) + + def get_names(self) -> types.GeneratorType: + for container in self._containers: + yield container.name + + def run(self, **env) -> str: + env = ["%s=%s" % (key, value) for (key, value) in env.items()] + return self._client.containers.run( + self._image, + runtime=self._runtime, + ports=self._ports, + remove=True, + environment=env, + **self._kwargs).decode("utf-8") + + def _clean_containers(self): + """Kills all containers.""" + for container in self._containers: + try: + container.kill() + except docker.errors.NotFound: + pass + + +class MockContainer(Container): + """Mock of Container.""" + + def __init__(self, workload: str): + self._workload = workload + + def __enter__(self): + return self + + def run(self, **env): + # Lookup sample data if any exists for the workload module. We use a + # well-defined test locate and a well-defined sample function. + mod = pydoc.locate(workloads.__name__ + "." + self._workload) + if hasattr(mod, "sample"): + return mod.sample(**env) + return "" # No output. + + def address(self) -> Tuple[str, int]: + return ("example.com", 80) + + def get_names(self) -> types.GeneratorType: + yield "mock" + + @contextlib.contextmanager + def detach(self, **env): + yield self diff --git a/benchmarks/harness/machine.py b/benchmarks/harness/machine.py new file mode 100644 index 000000000..2166d040a --- /dev/null +++ b/benchmarks/harness/machine.py @@ -0,0 +1,191 @@ +# python3 +# Copyright 2019 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Machine abstraction. This is the primary API for benchmarks.""" + +import logging +import re +import subprocess +import time +from typing import Tuple + +import docker + +from benchmarks import harness +from benchmarks.harness import container +from benchmarks.harness import machine_mocks +from benchmarks.harness import ssh_connection +from benchmarks.harness import tunnel_dispatcher + + +class Machine: + """The machine object is the primary object for benchmarks. + + Machine objects are passed to each metric function call and benchmarks use + machines to access real connections to those machines. + """ + + def run(self, cmd: str) -> Tuple[str, str]: + """Convenience method for running a bash command on a machine object. + + Some machines may point to the local machine, and thus, do not have ssh + connections. Run runs a command either local or over ssh and returns the + output stdout and stderr as strings. + + Args: + cmd: The command to run as a string. + + Returns: + The command output. + """ + raise NotImplementedError + + def read(self, path: str) -> str: + """Reads the contents of some file. + + This will be mocked. + + Args: + path: The path to the file to be read. + + Returns: + The file contents. + """ + raise NotImplementedError + + def pull(self, workload: str) -> str: + """Send the given workload to the machine, build and tag it. + + All images must be defined by the workloads directory. + + Args: + workload: The workload name. + + Returns: + The workload tag. + """ + raise NotImplementedError + + def container(self, image: str, **kwargs) -> container.Container: + """Returns a container object. + + Args: + image: The pulled image tag. + **kwargs: Additional container options. + + Returns: + :return: a container.Container object. + """ + raise NotImplementedError + + def sleep(self, amount: float): + """Sleeps the given amount of time.""" + raise NotImplementedError + + +class MockMachine(Machine): + """A mocked machine.""" + + def run(self, cmd: str) -> Tuple[str, str]: + return "", "" + + def read(self, path: str) -> str: + return machine_mocks.Readfile(path) + + def pull(self, workload: str) -> str: + return workload # Workload is the tag. + + def container(self, image: str, **kwargs) -> container.Container: + return container.MockContainer(image) + + def sleep(self, amount: float): + pass + + +def get_address(machine: Machine) -> str: + """Return a machine's default address.""" + default_route, _ = machine.run("ip route get 8.8.8.8") + return re.search(" src ([0-9.]+) ", default_route).group(1) + + +class LocalMachine(Machine): + """The local machine.""" + + def __init__(self, name): + self._name = name + self._docker_client = docker.from_env() + + def __str__(self): + return self._name + + def run(self, cmd: str) -> Tuple[str, str]: + process = subprocess.Popen( + cmd.split(" "), stdout=subprocess.PIPE, stderr=subprocess.PIPE) + stdout, stderr = process.communicate() + return stdout.decode("utf-8"), stderr.decode("utf-8") + + def read(self, path: str) -> str: + # Read the exact path locally. + return open(path, "r").read() + + def pull(self, workload: str) -> str: + # Run the docker build command locally. + logging.info("Building %s@%s locally...", workload, self._name) + self.run("docker build --tag={} {}".format( + workload, harness.LOCAL_WORKLOADS_PATH.format(workload))) + return workload # Workload is the tag. + + def container(self, image: str, **kwargs) -> container.Container: + # Return a local docker container directly. + return container.DockerContainer(self._docker_client, get_address(self), + image, **kwargs) + + def sleep(self, amount: float): + time.sleep(amount) + + +class RemoteMachine(Machine): + """Remote machine accessible via an SSH connection.""" + + def __init__(self, name, **kwargs): + self._name = name + self._ssh_connection = ssh_connection.SSHConnection(name, **kwargs) + self._tunnel = tunnel_dispatcher.Tunnel(name, **kwargs) + self._tunnel.connect() + self._docker_client = self._tunnel.get_docker_client() + + def __str__(self): + return self._name + + def run(self, cmd: str) -> Tuple[str, str]: + return self._ssh_connection.run(cmd) + + def read(self, path: str) -> str: + # Just cat remotely. + stdout, stderr = self._ssh_connection.run("cat '{}'".format(path)) + return stdout + stderr + + def pull(self, workload: str) -> str: + # Push to the remote machine and build. + logging.info("Building %s@%s remotely...", workload, self._name) + remote_path = self._ssh_connection.send_workload(workload) + self.run("docker build --tag={} {}".format(workload, remote_path)) + return workload # Workload is the tag. + + def container(self, image: str, **kwargs) -> container.Container: + # Return a remote docker container. + return container.DockerContainer(self._docker_client, get_address(self), + image, **kwargs) + + def sleep(self, amount: float): + time.sleep(amount) diff --git a/benchmarks/harness/machine_mocks/BUILD b/benchmarks/harness/machine_mocks/BUILD new file mode 100644 index 000000000..c8ec4bc79 --- /dev/null +++ b/benchmarks/harness/machine_mocks/BUILD @@ -0,0 +1,9 @@ +package( + default_visibility = ["//benchmarks:__subpackages__"], + licenses = ["notice"], +) + +py_library( + name = "machine_mocks", + srcs = ["__init__.py"], +) diff --git a/benchmarks/harness/machine_mocks/__init__.py b/benchmarks/harness/machine_mocks/__init__.py new file mode 100644 index 000000000..00f0085d7 --- /dev/null +++ b/benchmarks/harness/machine_mocks/__init__.py @@ -0,0 +1,81 @@ +# python3 +# Copyright 2019 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Machine mock files.""" + +MEMINFO = """\ +MemTotal: 7652344 kB +MemFree: 7174724 kB +MemAvailable: 7152008 kB +Buffers: 7544 kB +Cached: 178856 kB +SwapCached: 0 kB +Active: 270928 kB +Inactive: 68436 kB +Active(anon): 153124 kB +Inactive(anon): 880 kB +Active(file): 117804 kB +Inactive(file): 67556 kB +Unevictable: 0 kB +Mlocked: 0 kB +SwapTotal: 0 kB +SwapFree: 0 kB +Dirty: 900 kB +Writeback: 0 kB +AnonPages: 153000 kB +Mapped: 129120 kB +Shmem: 1044 kB +Slab: 60864 kB +SReclaimable: 22792 kB +SUnreclaim: 38072 kB +KernelStack: 2672 kB +PageTables: 5756 kB +NFS_Unstable: 0 kB +Bounce: 0 kB +WritebackTmp: 0 kB +CommitLimit: 3826172 kB +Committed_AS: 663836 kB +VmallocTotal: 34359738367 kB +VmallocUsed: 0 kB +VmallocChunk: 0 kB +HardwareCorrupted: 0 kB +AnonHugePages: 0 kB +ShmemHugePages: 0 kB +ShmemPmdMapped: 0 kB +CmaTotal: 0 kB +CmaFree: 0 kB +HugePages_Total: 0 +HugePages_Free: 0 +HugePages_Rsvd: 0 +HugePages_Surp: 0 +Hugepagesize: 2048 kB +DirectMap4k: 94196 kB +DirectMap2M: 4624384 kB +DirectMap1G: 3145728 kB +""" + +CONTENTS = { + "/proc/meminfo": MEMINFO, +} + + +def Readfile(path: str) -> str: + """Reads a mock file. + + Args: + path: The target path. + + Returns: + Mocked file contents or None. + """ + return CONTENTS.get(path, None) diff --git a/benchmarks/harness/machine_producers/BUILD b/benchmarks/harness/machine_producers/BUILD new file mode 100644 index 000000000..5b2228e01 --- /dev/null +++ b/benchmarks/harness/machine_producers/BUILD @@ -0,0 +1,35 @@ +load("//benchmarks:defs.bzl", "py_library", "requirement") + +package( + default_visibility = ["//benchmarks:__subpackages__"], + licenses = ["notice"], +) + +py_library( + name = "harness", + srcs = ["__init__.py"], +) + +py_library( + name = "machine_producer", + srcs = ["machine_producer.py"], +) + +py_library( + name = "mock_producer", + srcs = ["mock_producer.py"], + deps = [ + "//benchmarks/harness:machine", + "//benchmarks/harness/machine_producers:machine_producer", + ], +) + +py_library( + name = "yaml_producer", + srcs = ["yaml_producer.py"], + deps = [ + "//benchmarks/harness:machine", + "//benchmarks/harness/machine_producers:machine_producer", + requirement("PyYAML", False), + ], +) diff --git a/benchmarks/harness/machine_producers/__init__.py b/benchmarks/harness/machine_producers/__init__.py new file mode 100644 index 000000000..634ef4843 --- /dev/null +++ b/benchmarks/harness/machine_producers/__init__.py @@ -0,0 +1,13 @@ +# python3 +# Copyright 2019 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/benchmarks/harness/machine_producers/machine_producer.py b/benchmarks/harness/machine_producers/machine_producer.py new file mode 100644 index 000000000..124ee14cc --- /dev/null +++ b/benchmarks/harness/machine_producers/machine_producer.py @@ -0,0 +1,30 @@ +# python3 +# Copyright 2019 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Abstract types.""" + +from typing import List + +from benchmarks.harness import machine + + +class MachineProducer: + """Abstract Machine producer.""" + + def get_machines(self, num_machines: int) -> List[machine.Machine]: + """Returns the requested number of machines.""" + raise NotImplementedError + + def release_machines(self, machine_list: List[machine.Machine]): + """Releases the given set of machines.""" + raise NotImplementedError diff --git a/benchmarks/harness/machine_producers/mock_producer.py b/benchmarks/harness/machine_producers/mock_producer.py new file mode 100644 index 000000000..4f29ad53f --- /dev/null +++ b/benchmarks/harness/machine_producers/mock_producer.py @@ -0,0 +1,31 @@ +# python3 +# Copyright 2019 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Producers of mocks.""" + +from typing import List + +from benchmarks.harness import machine +from benchmarks.harness.machine_producers import machine_producer + + +class MockMachineProducer(machine_producer.MachineProducer): + """Produces MockMachine objects.""" + + def get_machines(self, num_machines: int) -> List[machine.MockMachine]: + """Returns the request number of MockMachines.""" + return [machine.MockMachine() for i in range(num_machines)] + + def release_machines(self, machine_list: List[machine.MockMachine]): + """No-op.""" + return diff --git a/benchmarks/harness/machine_producers/yaml_producer.py b/benchmarks/harness/machine_producers/yaml_producer.py new file mode 100644 index 000000000..5d334e480 --- /dev/null +++ b/benchmarks/harness/machine_producers/yaml_producer.py @@ -0,0 +1,106 @@ +# python3 +# Copyright 2019 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Producers based on yaml files.""" + +import os +import threading +from typing import Dict +from typing import List + +import yaml + +from benchmarks.harness import machine +from benchmarks.harness.machine_producers import machine_producer + + +class YamlMachineProducer(machine_producer.MachineProducer): + """Loads machines from a yaml file.""" + + def __init__(self, path: str): + self.machines = build_machines(path) + self.max_machines = len(self.machines) + self.machine_condition = threading.Condition() + + def get_machines(self, num_machines: int) -> List[machine.Machine]: + if num_machines > self.max_machines: + raise ValueError( + "Insufficient Ammount of Machines. {ask} asked for and have {max_num} max." + .format(ask=num_machines, max_num=self.max_machines)) + + with self.machine_condition: + while not self._enough_machines(num_machines): + self.machine_condition.wait(timeout=1) + return [self.machines.pop(0) for _ in range(num_machines)] + + def release_machines(self, machine_list: List[machine.Machine]): + with self.machine_condition: + while machine_list: + next_machine = machine_list.pop() + self.machines.append(next_machine) + self.machine_condition.notify() + + def _enough_machines(self, ask: int): + return ask <= len(self.machines) + + +def build_machines(path: str, num_machines: str = -1) -> List[machine.Machine]: + """Builds machine objects defined by the yaml file "path". + + Args: + path: The path to a yaml file which defines machines. + num_machines: Optional limit on how many machine objects to build. + + Returns: + Machine objects in a list. + + If num_machines is set, len(machines) <= num_machines. + """ + data = parse_yaml(path) + machines = [] + for key, value in data.items(): + if len(machines) == num_machines: + return machines + if isinstance(value, dict): + machines.append(machine.RemoteMachine(key, **value)) + else: + machines.append(machine.LocalMachine(key)) + return machines + + +def parse_yaml(path: str) -> Dict[str, Dict[str, str]]: + """Parse the yaml file pointed by path. + + Args: + path: The path to yaml file. + + Returns: + The contents of the yaml file as a dictionary. + """ + data = get_file_contents(path) + return yaml.load(data, Loader=yaml.Loader) + + +def get_file_contents(path: str) -> str: + """Dumps the file contents to a string and returns them. + + Args: + path: The path to dump. + + Returns: + The file contents as a string. + """ + if not os.path.isabs(path): + path = os.path.abspath(path) + with open(path) as input_file: + return input_file.read() diff --git a/benchmarks/harness/ssh_connection.py b/benchmarks/harness/ssh_connection.py new file mode 100644 index 000000000..fcbfbcdb2 --- /dev/null +++ b/benchmarks/harness/ssh_connection.py @@ -0,0 +1,111 @@ +# python3 +# Copyright 2019 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""SSHConnection handles the details of SSH connections.""" + +import os +import warnings + +import paramiko + +from benchmarks import harness + +# Get rid of paramiko Cryptography Warnings. +warnings.filterwarnings(action="ignore", module=".*paramiko.*") + + +def send_one_file(client: paramiko.SSHClient, path: str, remote_dir: str): + """Sends a single file via an SSH client. + + Args: + client: The existing SSH client. + path: The local path. + remote_dir: The remote directory. + """ + filename = path.split("/").pop() + client.exec_command("mkdir -p " + remote_dir) + with client.open_sftp() as ftp_client: + ftp_client.put(path, os.path.join(remote_dir, filename)) + + +class SSHConnection: + """SSH connection to a remote machine.""" + + def __init__(self, name: str, hostname: str, key_path: str, username: str, + **kwargs): + """Sets up a paramiko ssh connection to the given hostname.""" + self._name = name # Unused. + self._hostname = hostname + self._username = username + self._key_path = key_path # RSA Key path + self._kwargs = kwargs + # SSHConnection wraps paramiko. paramiko supports RSA, ECDSA, and Ed25519 + # keys, and we've chosen to only suport and require RSA keys. paramiko + # supports RSA keys that begin with '----BEGIN RSAKEY----'. + # https://stackoverflow.com/questions/53600581/ssh-key-generated-by-ssh-keygen-is-not-recognized-by-paramiko + self.rsa_key = self._rsa() + self.run("true") # Validate. + + def _client(self) -> paramiko.SSHClient: + """Returns a connected SSH client.""" + client = paramiko.SSHClient() + client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + client.connect( + hostname=self._hostname, + port=22, + username=self._username, + pkey=self.rsa_key, + allow_agent=False, + look_for_keys=False) + return client + + def _rsa(self): + if "key_password" in self._kwargs: + password = self._kwargs["key_password"] + else: + password = None + rsa = paramiko.RSAKey.from_private_key_file(self._key_path, password) + return rsa + + def run(self, cmd: str) -> (str, str): + """Runs a command via ssh. + + Args: + cmd: The shell command to run. + + Returns: + The contents of stdout and stderr. + """ + with self._client() as client: + _, stdout, stderr = client.exec_command(command=cmd) + stdout.channel.recv_exit_status() + stdout = stdout.read().decode("utf-8") + stderr = stderr.read().decode("utf-8") + return stdout, stderr + + def send_workload(self, name: str) -> str: + """Sends a workload to the remote machine. + + Args: + name: The workload name. + + Returns: + The remote path. + """ + with self._client() as client: + for dirpath, _, filenames in os.walk( + harness.LOCAL_WORKLOADS_PATH.format(name)): + for filename in filenames: + send_one_file(client, os.path.join(dirpath, filename), + harness.REMOTE_WORKLOADS_PATH.format(name)) + return harness.REMOTE_WORKLOADS_PATH.format(name) diff --git a/benchmarks/harness/tunnel_dispatcher.py b/benchmarks/harness/tunnel_dispatcher.py new file mode 100644 index 000000000..8dfe2862a --- /dev/null +++ b/benchmarks/harness/tunnel_dispatcher.py @@ -0,0 +1,82 @@ +# python3 +# Copyright 2019 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Tunnel handles setting up connections to remote machines.""" + +import os +import tempfile +import time + +import docker +import pexpect + +SSH_TUNNEL_COMMAND = """ssh + -o GlobalKnownHostsFile=/dev/null + -o UserKnownHostsFile=/dev/null + -o StrictHostKeyChecking=no + -nNT -L {filename}:/var/run/docker.sock + -i {key_path} + {username}@{hostname}""" + + +class Tunnel: + """The tunnel object represents the tunnel via ssh. + + This connects a local unix domain socket with a remote socket. + """ + + def __init__(self, name, hostname: str, username: str, key_path: str, + **kwargs): + self._filename = tempfile.NamedTemporaryFile(prefix=name).name + self._hostname = hostname + self._username = username + self._key_path = key_path + self._kwargs = kwargs + self._process = None + + def connect(self): + """Connects the SSH tunnel.""" + cmd = SSH_TUNNEL_COMMAND.format( + filename=self._filename, + key_path=self._key_path, + username=self._username, + hostname=self._hostname) + self._process = pexpect.spawn(cmd, timeout=10) + + # If given a password, assume we'll be asked for it. + if "key_password" in self._kwargs: + self._process.expect(["Enter passphrase for key .*: "]) + self._process.sendline(self._kwargs["key_password"]) + + while True: + # Wait for the tunnel to appear. + if self._process.exitstatus is not None: + raise ConnectionError("Error in setting up ssh tunnel") + if os.path.exists(self._filename): + return + time.sleep(0.1) + + def path(self): + """Return the socket file.""" + return self._filename + + def get_docker_client(self): + """Returns a docker client for this Tunne0l.""" + return docker.DockerClient(base_url="unix:/" + self._filename) + + def __del__(self): + """Closes the ssh connection process and deletes the socket file.""" + if self._process: + self._process.close() + if os.path.exists(self._filename): + os.remove(self._filename) -- cgit v1.2.3