diff options
Diffstat (limited to 'benchmarks/harness')
19 files changed, 2010 insertions, 0 deletions
diff --git a/benchmarks/harness/BUILD b/benchmarks/harness/BUILD new file mode 100644 index 000000000..081a74243 --- /dev/null +++ b/benchmarks/harness/BUILD @@ -0,0 +1,92 @@ +load("//benchmarks:defs.bzl", "py_library", "requirement") + +package( + default_visibility = ["//benchmarks:__subpackages__"], + licenses = ["notice"], +) + +py_library( + name = "harness", + srcs = ["__init__.py"], +) + +py_library( + name = "benchmark_driver", + srcs = ["benchmark_driver.py"], + deps = [ + "//benchmarks/harness/machine_mocks", + "//benchmarks/harness/machine_producers:machine_producer", + "//benchmarks/suites", + ], +) + +py_library( + name = "container", + srcs = ["container.py"], + deps = [ + "//benchmarks/workloads", + requirement("asn1crypto", False), + requirement("chardet", False), + requirement("certifi", False), + requirement("docker", True), + requirement("docker-pycreds", False), + requirement("idna", False), + requirement("ptyprocess", False), + requirement("requests", False), + requirement("urllib3", False), + requirement("websocket-client", False), + ], +) + +py_library( + name = "machine", + srcs = ["machine.py"], + deps = [ + "//benchmarks/harness", + "//benchmarks/harness:container", + "//benchmarks/harness:ssh_connection", + "//benchmarks/harness:tunnel_dispatcher", + "//benchmarks/harness/machine_mocks", + requirement("asn1crypto", False), + requirement("chardet", False), + requirement("certifi", False), + requirement("docker", True), + requirement("docker-pycreds", False), + requirement("idna", False), + requirement("ptyprocess", False), + requirement("requests", False), + requirement("six", False), + requirement("urllib3", False), + requirement("websocket-client", False), + ], +) + +py_library( + name = "ssh_connection", + srcs = ["ssh_connection.py"], + deps = [ + "//benchmarks/harness", + requirement("bcrypt", False), + requirement("cffi", True), + requirement("paramiko", True), + requirement("cryptography", False), + ], +) + +py_library( + name = "tunnel_dispatcher", + srcs = ["tunnel_dispatcher.py"], + deps = [ + requirement("asn1crypto", False), + requirement("chardet", False), + requirement("certifi", False), + requirement("docker", True), + requirement("docker-pycreds", False), + requirement("idna", False), + requirement("pexpect", True), + requirement("ptyprocess", False), + requirement("requests", False), + requirement("urllib3", False), + requirement("websocket-client", False), + ], +) diff --git a/benchmarks/harness/__init__.py b/benchmarks/harness/__init__.py new file mode 100644 index 000000000..61fd25f73 --- /dev/null +++ b/benchmarks/harness/__init__.py @@ -0,0 +1,32 @@ +# python3 +# Copyright 2019 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Core benchmark utilities.""" + +import getpass +import os + +# LOCAL_WORKLOADS_PATH defines the path to use for local workloads. This is a +# format string that accepts a single string parameter. +LOCAL_WORKLOADS_PATH = os.path.join( + os.path.dirname(__file__), "../workloads/{}/tar.tar") + +# REMOTE_WORKLOADS_PATH defines the path to use for storing the workloads on the +# remote host. This is a format string that accepts a single string parameter. +REMOTE_WORKLOADS_PATH = "workloads/{}" + +# DEFAULT_USER is the default user running this script. +DEFAULT_USER = getpass.getuser() + +# DEFAULT_USER_HOME is the home directory of the user running the script. +DEFAULT_USER_HOME = os.environ["HOME"] if "HOME" in os.environ else "" diff --git a/benchmarks/harness/benchmark_driver.py b/benchmarks/harness/benchmark_driver.py new file mode 100644 index 000000000..9abc21b54 --- /dev/null +++ b/benchmarks/harness/benchmark_driver.py @@ -0,0 +1,85 @@ +# python3 +# Copyright 2019 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Main driver for benchmarks.""" + +import copy +import statistics +import threading +import types + +from benchmarks import suites +from benchmarks.harness.machine_producers import machine_producer + + +# pylint: disable=too-many-instance-attributes +class BenchmarkDriver: + """Allocates machines and invokes a benchmark method.""" + + def __init__(self, + producer: machine_producer.MachineProducer, + method: types.FunctionType, + runs: int = 1, + **kwargs): + + self._producer = producer + self._method = method + self._kwargs = copy.deepcopy(kwargs) + self._threads = [] + self.lock = threading.RLock() + self._runs = runs + self._metric_results = {} + + def start(self): + """Starts a benchmark thread.""" + for _ in range(self._runs): + thread = threading.Thread(target=self._run_method) + thread.start() + self._threads.append(thread) + + def join(self): + """Joins the thread.""" + # pylint: disable=expression-not-assigned + [t.join() for t in self._threads] + + def _run_method(self): + """Runs all benchmarks.""" + machines = self._producer.get_machines( + suites.benchmark_machines(self._method)) + try: + result = self._method(*machines, **self._kwargs) + for name, res in result: + with self.lock: + if name in self._metric_results: + self._metric_results[name].append(res) + else: + self._metric_results[name] = [res] + finally: + # Always release. + self._producer.release_machines(machines) + + def median(self): + """Returns the median result, after join is finished.""" + for key, value in self._metric_results.items(): + yield key, [statistics.median(value)] + + def all(self): + """Returns all results.""" + for key, value in self._metric_results.items(): + yield key, value + + def meanstd(self): + """Returns all results.""" + for key, value in self._metric_results.items(): + mean = statistics.mean(value) + yield key, [mean, statistics.stdev(value, xbar=mean)] diff --git a/benchmarks/harness/container.py b/benchmarks/harness/container.py new file mode 100644 index 000000000..585436e20 --- /dev/null +++ b/benchmarks/harness/container.py @@ -0,0 +1,181 @@ +# python3 +# Copyright 2019 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Container definitions.""" + +import contextlib +import logging +import pydoc +import types +from typing import Tuple + +import docker +import docker.errors + +from benchmarks import workloads + + +class Container: + """Abstract container. + + Must be a context manager. + + Usage: + + with Container(client, image, ...): + ... + """ + + def run(self, **env) -> str: + """Run the container synchronously.""" + raise NotImplementedError + + def detach(self, **env): + """Run the container asynchronously.""" + raise NotImplementedError + + def address(self) -> Tuple[str, int]: + """Return the bound address for the container.""" + raise NotImplementedError + + def get_names(self) -> types.GeneratorType: + """Return names of all containers.""" + raise NotImplementedError + + +# pylint: disable=too-many-instance-attributes +class DockerContainer(Container): + """Class that handles creating a docker container.""" + + # pylint: disable=too-many-arguments + def __init__(self, + client: docker.DockerClient, + host: str, + image: str, + count: int = 1, + runtime: str = "runc", + port: int = 0, + **kwargs): + """Trys to setup "count" containers. + + Args: + client: A docker client from dockerpy. + host: The host address the image is running on. + image: The name of the image to run. + count: The number of containers to setup. + runtime: The container runtime to use. + port: The port to reserve. + **kwargs: Additional container options. + """ + assert count >= 1 + assert port == 0 or count == 1 + self._client = client + self._host = host + self._containers = [] + self._count = count + self._image = image + self._runtime = runtime + self._port = port + self._kwargs = kwargs + if port != 0: + self._ports = {"%d/tcp" % port: None} + else: + self._ports = {} + + @contextlib.contextmanager + def detach(self, **env): + env = ["%s=%s" % (key, value) for (key, value) in env.items()] + # Start all containers. + for _ in range(self._count): + try: + # Start the container in a detached mode. + container = self._client.containers.run( + self._image, + detach=True, + remove=True, + runtime=self._runtime, + ports=self._ports, + environment=env, + **self._kwargs) + logging.info("Started detached container %s -> %s", self._image, + container.attrs["Id"]) + self._containers.append(container) + except Exception as exc: + self._clean_containers() + raise exc + try: + # Wait for all containers to be up. + for container in self._containers: + while not container.attrs["State"]["Running"]: + container = self._client.containers.get(container.attrs["Id"]) + yield self + finally: + self._clean_containers() + + def address(self) -> Tuple[str, int]: + assert self._count == 1 + assert self._port != 0 + container = self._client.containers.get(self._containers[0].attrs["Id"]) + port = container.attrs["NetworkSettings"]["Ports"][ + "%d/tcp" % self._port][0]["HostPort"] + return (self._host, port) + + def get_names(self) -> types.GeneratorType: + for container in self._containers: + yield container.name + + def run(self, **env) -> str: + env = ["%s=%s" % (key, value) for (key, value) in env.items()] + return self._client.containers.run( + self._image, + runtime=self._runtime, + ports=self._ports, + remove=True, + environment=env, + **self._kwargs).decode("utf-8") + + def _clean_containers(self): + """Kills all containers.""" + for container in self._containers: + try: + container.kill() + except docker.errors.NotFound: + pass + + +class MockContainer(Container): + """Mock of Container.""" + + def __init__(self, workload: str): + self._workload = workload + + def __enter__(self): + return self + + def run(self, **env): + # Lookup sample data if any exists for the workload module. We use a + # well-defined test locate and a well-defined sample function. + mod = pydoc.locate(workloads.__name__ + "." + self._workload) + if hasattr(mod, "sample"): + return mod.sample(**env) + return "" # No output. + + def address(self) -> Tuple[str, int]: + return ("example.com", 80) + + def get_names(self) -> types.GeneratorType: + yield "mock" + + @contextlib.contextmanager + def detach(self, **env): + yield self diff --git a/benchmarks/harness/machine.py b/benchmarks/harness/machine.py new file mode 100644 index 000000000..2df4c9e31 --- /dev/null +++ b/benchmarks/harness/machine.py @@ -0,0 +1,229 @@ +# python3 +# Copyright 2019 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Machine abstraction passed to benchmarks to run docker containers. + +Abstraction for interacting with test machines. Machines are produced +by Machine producers and represent a local or remote machine. Benchmark +methods in /benchmarks/suite are passed the required number of machines in order +to run the benchmark. Machines contain methods to run commands via bash, +possibly over ssh. Machines also hold a connection to the docker UNIX socket +to run contianers. + + Typical usage example: + + machine = Machine() + machine.run(cmd) + machine.pull(path) + container = machine.container() +""" + +import logging +import re +import subprocess +import time +from typing import Tuple + +import docker + +from benchmarks import harness +from benchmarks.harness import container +from benchmarks.harness import machine_mocks +from benchmarks.harness import ssh_connection +from benchmarks.harness import tunnel_dispatcher + + +class Machine(object): + """The machine object is the primary object for benchmarks. + + Machine objects are passed to each metric function call and benchmarks use + machines to access real connections to those machines. + + Attributes: + _name: Name as a string + """ + _name = "" + + def run(self, cmd: str) -> Tuple[str, str]: + """Convenience method for running a bash command on a machine object. + + Some machines may point to the local machine, and thus, do not have ssh + connections. Run runs a command either local or over ssh and returns the + output stdout and stderr as strings. + + Args: + cmd: The command to run as a string. + + Returns: + The command output. + """ + raise NotImplementedError + + def read(self, path: str) -> str: + """Reads the contents of some file. + + This will be mocked. + + Args: + path: The path to the file to be read. + + Returns: + The file contents. + """ + raise NotImplementedError + + def pull(self, workload: str) -> str: + """Send the given workload to the machine, build and tag it. + + All images must be defined by the workloads directory. + + Args: + workload: The workload name. + + Returns: + The workload tag. + """ + raise NotImplementedError + + def container(self, image: str, **kwargs) -> container.Container: + """Returns a container object. + + Args: + image: The pulled image tag. + **kwargs: Additional container options. + + Returns: + :return: a container.Container object. + """ + raise NotImplementedError + + def sleep(self, amount: float): + """Sleeps the given amount of time.""" + time.sleep(amount) + + def __str__(self): + return self._name + + +class MockMachine(Machine): + """A mocked machine.""" + _name = "mock" + + def run(self, cmd: str) -> Tuple[str, str]: + return "", "" + + def read(self, path: str) -> str: + return machine_mocks.Readfile(path) + + def pull(self, workload: str) -> str: + return workload # Workload is the tag. + + def container(self, image: str, **kwargs) -> container.Container: + return container.MockContainer(image) + + def sleep(self, amount: float): + pass + + +def get_address(machine: Machine) -> str: + """Return a machine's default address.""" + default_route, _ = machine.run("ip route get 8.8.8.8") + return re.search(" src ([0-9.]+) ", default_route).group(1) + + +class LocalMachine(Machine): + """The local machine. + + Attributes: + _name: Name as a string + _docker_client: a pythonic connection to to the local dockerd unix socket. + See: https://github.com/docker/docker-py + """ + + def __init__(self, name): + self._name = name + self._docker_client = docker.from_env() + + def run(self, cmd: str) -> Tuple[str, str]: + process = subprocess.Popen( + cmd.split(" "), stdout=subprocess.PIPE, stderr=subprocess.PIPE) + stdout, stderr = process.communicate() + return stdout.decode("utf-8"), stderr.decode("utf-8") + + def read(self, path: str) -> bytes: + # Read the exact path locally. + return open(path, "r").read() + + def pull(self, workload: str) -> str: + # Run the docker build command locally. + logging.info("Building %s@%s locally...", workload, self._name) + with open(harness.LOCAL_WORKLOADS_PATH.format(workload), + "rb") as dockerfile: + self._docker_client.images.build( + fileobj=dockerfile, tag=workload, custom_context=True) + return workload # Workload is the tag. + + def container(self, image: str, **kwargs) -> container.Container: + # Return a local docker container directly. + return container.DockerContainer(self._docker_client, get_address(self), + image, **kwargs) + + def sleep(self, amount: float): + time.sleep(amount) + + +class RemoteMachine(Machine): + """Remote machine accessible via an SSH connection. + + Attributes: + _name: Name as a string + _ssh_connection: a paramiko backed ssh connection which can be used to run + commands on this machine + _tunnel: a python wrapper around a port forwarded ssh connection between a + local unix socket and the remote machine's dockerd unix socket. + _docker_client: a pythonic wrapper backed by the _tunnel. Allows sending + docker commands: see https://github.com/docker/docker-py + """ + + def __init__(self, name, **kwargs): + self._name = name + self._ssh_connection = ssh_connection.SSHConnection(name, **kwargs) + self._tunnel = tunnel_dispatcher.Tunnel(name, **kwargs) + self._tunnel.connect() + self._docker_client = self._tunnel.get_docker_client() + + def run(self, cmd: str) -> Tuple[str, str]: + return self._ssh_connection.run(cmd) + + def read(self, path: str) -> str: + # Just cat remotely. + stdout, stderr = self._ssh_connection.run("cat '{}'".format(path)) + return stdout + stderr + + def pull(self, workload: str) -> str: + # Push to the remote machine and build. + logging.info("Building %s@%s remotely...", workload, self._name) + remote_path = self._ssh_connection.send_workload(workload) + # Workloads are all tarballs. + self.run("tar -xvf {remote_path}/tar.tar -C {remote_path}".format( + remote_path=remote_path)) + self.run("docker build --tag={} {}".format(workload, remote_path)) + return workload # Workload is the tag. + + def container(self, image: str, **kwargs) -> container.Container: + # Return a remote docker container. + return container.DockerContainer(self._docker_client, get_address(self), + image, **kwargs) + + def sleep(self, amount: float): + time.sleep(amount) diff --git a/benchmarks/harness/machine_mocks/BUILD b/benchmarks/harness/machine_mocks/BUILD new file mode 100644 index 000000000..c8ec4bc79 --- /dev/null +++ b/benchmarks/harness/machine_mocks/BUILD @@ -0,0 +1,9 @@ +package( + default_visibility = ["//benchmarks:__subpackages__"], + licenses = ["notice"], +) + +py_library( + name = "machine_mocks", + srcs = ["__init__.py"], +) diff --git a/benchmarks/harness/machine_mocks/__init__.py b/benchmarks/harness/machine_mocks/__init__.py new file mode 100644 index 000000000..00f0085d7 --- /dev/null +++ b/benchmarks/harness/machine_mocks/__init__.py @@ -0,0 +1,81 @@ +# python3 +# Copyright 2019 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Machine mock files.""" + +MEMINFO = """\ +MemTotal: 7652344 kB +MemFree: 7174724 kB +MemAvailable: 7152008 kB +Buffers: 7544 kB +Cached: 178856 kB +SwapCached: 0 kB +Active: 270928 kB +Inactive: 68436 kB +Active(anon): 153124 kB +Inactive(anon): 880 kB +Active(file): 117804 kB +Inactive(file): 67556 kB +Unevictable: 0 kB +Mlocked: 0 kB +SwapTotal: 0 kB +SwapFree: 0 kB +Dirty: 900 kB +Writeback: 0 kB +AnonPages: 153000 kB +Mapped: 129120 kB +Shmem: 1044 kB +Slab: 60864 kB +SReclaimable: 22792 kB +SUnreclaim: 38072 kB +KernelStack: 2672 kB +PageTables: 5756 kB +NFS_Unstable: 0 kB +Bounce: 0 kB +WritebackTmp: 0 kB +CommitLimit: 3826172 kB +Committed_AS: 663836 kB +VmallocTotal: 34359738367 kB +VmallocUsed: 0 kB +VmallocChunk: 0 kB +HardwareCorrupted: 0 kB +AnonHugePages: 0 kB +ShmemHugePages: 0 kB +ShmemPmdMapped: 0 kB +CmaTotal: 0 kB +CmaFree: 0 kB +HugePages_Total: 0 +HugePages_Free: 0 +HugePages_Rsvd: 0 +HugePages_Surp: 0 +Hugepagesize: 2048 kB +DirectMap4k: 94196 kB +DirectMap2M: 4624384 kB +DirectMap1G: 3145728 kB +""" + +CONTENTS = { + "/proc/meminfo": MEMINFO, +} + + +def Readfile(path: str) -> str: + """Reads a mock file. + + Args: + path: The target path. + + Returns: + Mocked file contents or None. + """ + return CONTENTS.get(path, None) diff --git a/benchmarks/harness/machine_producers/BUILD b/benchmarks/harness/machine_producers/BUILD new file mode 100644 index 000000000..c4e943882 --- /dev/null +++ b/benchmarks/harness/machine_producers/BUILD @@ -0,0 +1,80 @@ +load("//benchmarks:defs.bzl", "py_library", "requirement") + +package( + default_visibility = ["//benchmarks:__subpackages__"], + licenses = ["notice"], +) + +py_library( + name = "harness", + srcs = ["__init__.py"], +) + +py_library( + name = "machine_producer", + srcs = ["machine_producer.py"], +) + +py_library( + name = "mock_producer", + srcs = ["mock_producer.py"], + deps = [ + "//benchmarks/harness:machine", + "//benchmarks/harness/machine_producers:gcloud_producer", + "//benchmarks/harness/machine_producers:machine_producer", + ], +) + +py_library( + name = "yaml_producer", + srcs = ["yaml_producer.py"], + deps = [ + "//benchmarks/harness:machine", + "//benchmarks/harness/machine_producers:machine_producer", + requirement("PyYAML", False), + ], +) + +py_library( + name = "gcloud_mock_recorder", + srcs = ["gcloud_mock_recorder.py"], +) + +py_library( + name = "gcloud_producer", + srcs = ["gcloud_producer.py"], + deps = [ + "//benchmarks/harness:machine", + "//benchmarks/harness/machine_producers:gcloud_mock_recorder", + "//benchmarks/harness/machine_producers:machine_producer", + ], +) + +filegroup( + name = "test_data", + srcs = [ + "testdata/get_five.json", + "testdata/get_one.json", + ], +) + +py_library( + name = "gcloud_producer_test_lib", + srcs = ["gcloud_producer_test.py"], + deps = [ + "//benchmarks/harness/machine_producers:machine_producer", + "//benchmarks/harness/machine_producers:mock_producer", + ], +) + +py_test( + name = "gcloud_producer_test", + srcs = [":gcloud_producer_test_lib"], + data = [ + ":test_data", + ], + python_version = "PY3", + tags = [ + "local", + ], +) diff --git a/benchmarks/harness/machine_producers/__init__.py b/benchmarks/harness/machine_producers/__init__.py new file mode 100644 index 000000000..634ef4843 --- /dev/null +++ b/benchmarks/harness/machine_producers/__init__.py @@ -0,0 +1,13 @@ +# python3 +# Copyright 2019 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/benchmarks/harness/machine_producers/gcloud_mock_recorder.py b/benchmarks/harness/machine_producers/gcloud_mock_recorder.py new file mode 100644 index 000000000..fd9837a37 --- /dev/null +++ b/benchmarks/harness/machine_producers/gcloud_mock_recorder.py @@ -0,0 +1,97 @@ +# python3 +# Copyright 2019 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""A recorder and replay for testing the GCloudProducer. + +MockPrinter and MockReader handle printing and reading mock data for the +purposes of testing. MockPrinter is passed to GCloudProducer objects. The user +can then run scenarios and record them for playback in tests later. + +MockReader is passed to MockGcloudProducer objects and handles reading the +previously recorded mock data. + +It is left to the user to check if data printed is properly redacted for their +own use. The intended usecase for this class is data coming from gcloud +commands, which will contain public IPs and other instance data. + +The data format is json and printed/read from the ./test_data directory. The +data is the output of subprocess.CompletedProcess objects in json format. + + Typical usage example: + + recorder = MockPrinter() + producer = GCloudProducer(args, recorder) + machines = producer.get_machines(1) + with open("my_file.json") as fd: + recorder.write_out(fd) + + reader = MockReader(filename) + producer = MockGcloudProducer(args, mock) + machines = producer.get_machines(1) + assert len(machines) == 1 +""" + +import io +import json +import subprocess + + +class MockPrinter(object): + """Handles printing Mock data for MockGcloudProducer. + + Attributes: + _records: list of json object records for printing + """ + + def __init__(self): + self._records = [] + + def record(self, entry: subprocess.CompletedProcess): + """Records data and strips out ip addresses.""" + + record = { + "args": entry.args, + "stdout": entry.stdout.decode("utf-8"), + "returncode": str(entry.returncode) + } + self._records.append(record) + + def write_out(self, fd: io.FileIO): + """Prints out the data into the given filepath.""" + fd.write(json.dumps(self._records, indent=4)) + + +class MockReader(object): + """Handles reading Mock data for MockGcloudProducer. + + Attributes: + _records: List[json] records read from the passed in file. + """ + + def __init__(self, filepath: str): + with open(filepath, "rb") as file: + self._records = json.loads(file.read()) + self._i = 0 + + def __iter__(self): + return self + + def __next__(self, args) -> subprocess.CompletedProcess: + """Returns the next record as a CompletedProcess.""" + if self._i < len(self._records): + record = self._records[self._i] + stdout = record["stdout"].encode("ascii") + returncode = int(record["returncode"]) + return subprocess.CompletedProcess( + args=args, returncode=returncode, stdout=stdout, stderr=b"") + raise StopIteration() diff --git a/benchmarks/harness/machine_producers/gcloud_producer.py b/benchmarks/harness/machine_producers/gcloud_producer.py new file mode 100644 index 000000000..e0b77d52b --- /dev/null +++ b/benchmarks/harness/machine_producers/gcloud_producer.py @@ -0,0 +1,268 @@ +# python3 +# Copyright 2019 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""A machine producer which produces machine objects using `gcloud`. + +Machine producers produce valid harness.Machine objects which are backed by +real machines. This producer produces those machines on the given user's GCP +account using the `gcloud` tool. + +GCloudProducer creates instances on the given GCP account named like: +`machine-XXXXXXX-XXXX-XXXX-XXXXXXXXXXXX` in a randomized fashion such that name +collisions with user instances shouldn't happen. + + Typical usage example: + + producer = GCloudProducer(args) + machines = producer.get_machines(NUM_MACHINES) + # run stuff on machines with machines[i].run(CMD) + producer.release_machines(NUM_MACHINES) +""" +import datetime +import json +import subprocess +import threading +from typing import List, Dict, Any +import uuid + +from benchmarks.harness import machine +from benchmarks.harness.machine_producers import gcloud_mock_recorder +from benchmarks.harness.machine_producers import machine_producer + + +class GCloudProducer(machine_producer.MachineProducer): + """Implementation of MachineProducer backed by GCP. + + Produces Machine objects backed by GCP instances. + + Attributes: + project: The GCP project name under which to create the machines. + ssh_key_file: path to a valid ssh private key. See README on vaild ssh keys. + image: image name as a string. + image_project: image project as a string. + machine_type: type of GCP to create. e.g. n1-standard-4 + zone: string to a valid GCP zone. + ssh_user: string of user name for ssh_key + ssh_password: string of password for ssh key + mock: a mock printer which will print mock data if required. Mock data is + recorded output from subprocess calls (returncode, stdout, args). + condition: mutex for this class around machine creation and deleteion. + """ + + def __init__(self, + project: str, + ssh_key_file: str, + image: str, + image_project: str, + machine_type: str, + zone: str, + ssh_user: str, + ssh_password: str, + mock: gcloud_mock_recorder.MockPrinter = None): + self.project = project + self.ssh_key_file = ssh_key_file + self.image = image + self.image_project = image_project + self.machine_type = machine_type + self.zone = zone + self.ssh_user = ssh_user + self.ssh_password = ssh_password + self.mock = mock + self.condition = threading.Condition() + + def get_machines(self, num_machines: int) -> List[machine.Machine]: + """Returns requested number of machines backed by GCP instances.""" + if num_machines <= 0: + raise ValueError( + "Cannot ask for {num} machines!".format(num=num_machines)) + with self.condition: + names = self._get_unique_names(num_machines) + self._build_instances(names) + instances = self._start_command(names) + self._add_ssh_key_to_instances(names) + return self._machines_from_instances(instances) + + def release_machines(self, machine_list: List[machine.Machine]): + """Releases the requested number of machines, deleting the instances.""" + if not machine_list: + return + cmd = "gcloud compute instances delete --quiet".split(" ") + names = [str(m) for m in machine_list] + cmd.extend(names) + cmd.append("--zone={zone}".format(zone=self.zone)) + self._run_command(cmd, detach=True) + + def _machines_from_instances( + self, instances: List[Dict[str, Any]]) -> List[machine.Machine]: + """Creates Machine Objects from json data describing created instances.""" + machines = [] + for instance in instances: + name = instance["name"] + kwargs = { + "hostname": + instance["networkInterfaces"][0]["accessConfigs"][0]["natIP"], + "key_path": + self.ssh_key_file, + "username": + self.ssh_user, + "key_password": + self.ssh_password + } + machines.append(machine.RemoteMachine(name=name, **kwargs)) + return machines + + def _get_unique_names(self, num_names) -> List[str]: + """Returns num_names unique names based on data from the GCP project.""" + curr_machines = self._list_machines() + curr_names = set([machine["name"] for machine in curr_machines]) + ret = [] + while len(ret) < num_names: + new_name = "machine-" + str(uuid.uuid4()) + if new_name not in curr_names: + ret.append(new_name) + curr_names.update(new_name) + return ret + + def _build_instances(self, names: List[str]) -> List[Dict[str, Any]]: + """Creates instances using gcloud command. + + Runs the command `gcloud compute instances create` and returns json data + on created instances on success. Creates len(names) instances, one for each + name. + + Args: + names: list of names of instances to create. + + Returns: + List of json data describing created machines. + """ + if not names: + raise ValueError( + "_build_instances cannot create instances without names.") + cmd = "gcloud compute instances create".split(" ") + cmd.extend(names) + cmd.extend( + "--preemptible --image={image} --zone={zone} --machine-type={machine_type}" + .format( + image=self.image, zone=self.zone, + machine_type=self.machine_type).split(" ")) + if self.image_project: + cmd.append("--image-project={project}".format(project=self.image_project)) + res = self._run_command(cmd) + return json.loads(res.stdout) + + def _start_command(self, names): + """Starts instances using gcloud command. + + Runs the command `gcloud compute instances start` on list of instances by + name and returns json data on started instances on success. + + Args: + names: list of names of instances to start. + + Returns: + List of json data describing started machines. + """ + if not names: + raise ValueError("_start_command cannot start empty instance list.") + cmd = "gcloud compute instances start".split(" ") + cmd.extend(names) + cmd.append("--zone={zone}".format(zone=self.zone)) + cmd.append("--project={project}".format(project=self.project)) + res = self._run_command(cmd) + return json.loads(res.stdout) + + def _add_ssh_key_to_instances(self, names: List[str]) -> None: + """Adds ssh key to instances by calling gcloud ssh command. + + Runs the command `gcloud compute ssh instance_name` on list of images by + name. Tries to ssh into given instance + + Args: + names: list of machine names to which to add the ssh-key + self.ssh_key_file. + + Raises: + subprocess.CalledProcessError: when underlying subprocess call returns an + error other than 255 (Connection closed by remote host). + TimeoutError: when 3 unsuccessful tries to ssh into the host return 255. + """ + for name in names: + cmd = "gcloud compute ssh {name}".format(name=name).split(" ") + cmd.append("--ssh-key-file={key}".format(key=self.ssh_key_file)) + cmd.append("--zone={zone}".format(zone=self.zone)) + cmd.append("--command=uname") + timeout = datetime.timedelta(seconds=5 * 60) + start = datetime.datetime.now() + while datetime.datetime.now() <= timeout + start: + try: + self._run_command(cmd) + break + except subprocess.CalledProcessError as e: + if datetime.datetime.now() > timeout + start: + raise TimeoutError( + "Could not SSH into instance after 5 min: {name}".format( + name=name)) + # 255 is the returncode for ssh connection refused. + elif e.returncode == 255: + + continue + else: + raise e + + def _list_machines(self) -> List[Dict[str, Any]]: + """Runs `list` gcloud command and returns list of Machine data.""" + cmd = "gcloud compute instances list --project {project}".format( + project=self.project).split(" ") + res = self._run_command(cmd) + return json.loads(res.stdout) + + def _run_command(self, + cmd: List[str], + detach: bool = False) -> [None, subprocess.CompletedProcess]: + """Runs command as a subprocess. + + Runs command as subprocess and returns the result. + If this has a mock recorder, use the record method to record the subprocess + call. + + Args: + cmd: command to be run as a list of strings. + detach: if True, run the child process and don't wait for it to return. + + Returns: + Completed process object to be parsed by caller or None if detach=True. + + Raises: + CalledProcessError: if subprocess.run returns an error. + """ + cmd = cmd + ["--format=json"] + if detach: + p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + if self.mock: + out, _ = p.communicate() + self.mock.record( + subprocess.CompletedProcess( + returncode=p.returncode, stdout=out, args=p.args)) + return + + res = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + if self.mock: + self.mock.record(res) + if res.returncode != 0: + raise subprocess.CalledProcessError( + cmd=res.args, + output=res.stdout, + stderr=res.stderr, + returncode=res.returncode) + return res diff --git a/benchmarks/harness/machine_producers/gcloud_producer_test.py b/benchmarks/harness/machine_producers/gcloud_producer_test.py new file mode 100644 index 000000000..c8adb2bdc --- /dev/null +++ b/benchmarks/harness/machine_producers/gcloud_producer_test.py @@ -0,0 +1,48 @@ +# python3 +# Copyright 2019 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Tests GCloudProducer using mock data. + +GCloudProducer produces machines using 'get_machines' and 'release_machines' +methods. The tests check recorded data (jsonified subprocess.CompletedProcess +objects) of the producer producing one and five machines. +""" +import os +import types + +from benchmarks.harness.machine_producers import machine_producer +from benchmarks.harness.machine_producers import mock_producer + +TEST_DIR = os.path.dirname(__file__) + + +def run_get_release(producer: machine_producer.MachineProducer, + num_machines: int, + validator: types.FunctionType = None): + machines = producer.get_machines(num_machines) + assert len(machines) == num_machines + if validator: + validator(machines=machines, cmd="uname -a", workload=None) + producer.release_machines(machines) + + +def test_run_one(): + mock = mock_producer.MockReader(TEST_DIR + "get_one.json") + producer = mock_producer.MockGCloudProducer(mock) + run_get_release(producer, 1) + + +def test_run_five(): + mock = mock_producer.MockReader(TEST_DIR + "get_five.json") + producer = mock_producer.MockGCloudProducer(mock) + run_get_release(producer, 5) diff --git a/benchmarks/harness/machine_producers/machine_producer.py b/benchmarks/harness/machine_producers/machine_producer.py new file mode 100644 index 000000000..f5591c026 --- /dev/null +++ b/benchmarks/harness/machine_producers/machine_producer.py @@ -0,0 +1,51 @@ +# python3 +# Copyright 2019 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Abstract types.""" + +import threading +from typing import List + +from benchmarks.harness import machine + + +class MachineProducer: + """Abstract Machine producer.""" + + def get_machines(self, num_machines: int) -> List[machine.Machine]: + """Returns the requested number of machines.""" + raise NotImplementedError + + def release_machines(self, machine_list: List[machine.Machine]): + """Releases the given set of machines.""" + raise NotImplementedError + + +class LocalMachineProducer(MachineProducer): + """Produces Local Machines.""" + + def __init__(self, limit: int): + self.limit_sem = threading.Semaphore(value=limit) + + def get_machines(self, num_machines: int) -> List[machine.Machine]: + """Returns the request number of MockMachines.""" + + self.limit_sem.acquire() + return [machine.LocalMachine("local") for _ in range(num_machines)] + + def release_machines(self, machine_list: List[machine.MockMachine]): + """No-op.""" + if not machine_list: + raise ValueError("Cannot release an empty list!") + self.limit_sem.release() + machine_list.clear() diff --git a/benchmarks/harness/machine_producers/mock_producer.py b/benchmarks/harness/machine_producers/mock_producer.py new file mode 100644 index 000000000..37e9cb4b7 --- /dev/null +++ b/benchmarks/harness/machine_producers/mock_producer.py @@ -0,0 +1,52 @@ +# python3 +# Copyright 2019 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Producers of mocks.""" + +from typing import List, Any + +from benchmarks.harness import machine +from benchmarks.harness.machine_producers import gcloud_mock_recorder +from benchmarks.harness.machine_producers import gcloud_producer +from benchmarks.harness.machine_producers import machine_producer + + +class MockMachineProducer(machine_producer.MachineProducer): + """Produces MockMachine objects.""" + + def get_machines(self, num_machines: int) -> List[machine.MockMachine]: + """Returns the request number of MockMachines.""" + return [machine.MockMachine() for i in range(num_machines)] + + def release_machines(self, machine_list: List[machine.MockMachine]): + """No-op.""" + return + + +class MockGCloudProducer(gcloud_producer.GCloudProducer): + """Mocks GCloudProducer for testing purposes.""" + + def __init__(self, mock: gcloud_mock_recorder.MockReader, **kwargs): + gcloud_producer.GCloudProducer.__init__( + self, project="mock", ssh_private_key_path="mock", **kwargs) + self.mock = mock + + def _validate_ssh_file(self): + pass + + def _run_command(self, cmd): + return self.mock.pop(cmd) + + def _machines_from_instances( + self, instances: List[Any]) -> List[machine.MockMachine]: + return [machine.MockMachine() for _ in instances] diff --git a/benchmarks/harness/machine_producers/testdata/get_five.json b/benchmarks/harness/machine_producers/testdata/get_five.json new file mode 100644 index 000000000..32bad1b06 --- /dev/null +++ b/benchmarks/harness/machine_producers/testdata/get_five.json @@ -0,0 +1,211 @@ +[ + { + "args": [ + "gcloud", + "compute", + "instances", + "list", + "--project", + "project", + "--format=json" + ], + "stdout": "[{\"name\":\"name\", \"networkInterfaces\":[{\"accessConfigs\":[{\"natIP\":\"0.0.0.0\"}]}]},{\"name\":\"name\", \"networkInterfaces\":[{\"accessConfigs\":[{\"natIP\":\"0.0.0.0\"}]}]},{\"name\":\"name\", \"networkInterfaces\":[{\"accessConfigs\":[{\"natIP\":\"0.0.0.0\"}]}]},{\"name\":\"name\", \"networkInterfaces\":[{\"accessConfigs\":[{\"natIP\":\"0.0.0.0\"}]}]},{\"name\":\"name\", \"networkInterfaces\":[{\"accessConfigs\":{\"natIP\":\"0.0.0.0\"}]}]}]", + "returncode": "0" + }, + { + "args": [ + "gcloud", + "compute", + "instances", + "create", + "machine-42c9bf6e-8d45-4c37-b1c0-7e4fdcf530fc", + "machine-5f28f145-cc2d-427d-9cbf-428d164cdb92", + "machine-da5859b5-bae6-435d-8005-0202d6f6e065", + "machine-880a8a2f-918c-4f9e-a43c-ed3c8e02ea05", + "machine-1149147d-71e2-43ea-8fe1-49256e5c441c", + "--preemptible", + "--image=ubuntu-1910-eoan-v20191204", + "--zone=us-west1-b", + "--image-project=ubuntu-os-cloud", + "--format=json" + ], + "stdout": "[{\"name\":\"name\", \"networkInterfaces\":[{\"accessConfigs\":[{\"natIP\":\"0.0.0.0\"}]}]},{\"name\":\"name\", \"networkInterfaces\":[{\"accessConfigs\":[{\"natIP\":\"0.0.0.0\"}]}]},{\"name\":\"name\", \"networkInterfaces\":[{\"accessConfigs\":[{\"natIP\":\"0.0.0.0\"}]}]},{\"name\":\"name\", \"networkInterfaces\":[{\"accessConfigs\":[{\"natIP\":\"0.0.0.0\"}]}]},{\"name\":\"name\", \"networkInterfaces\":[{\"accessConfigs\":[{\"natIP\":\"0.0.0.0\"}]}]}]", + "returncode": "0" + }, + { + "args": [ + "gcloud", + "compute", + "instances", + "start", + "machine-42c9bf6e-8d45-4c37-b1c0-7e4fdcf530fc", + "machine-5f28f145-cc2d-427d-9cbf-428d164cdb92", + "machine-da5859b5-bae6-435d-8005-0202d6f6e065", + "machine-880a8a2f-918c-4f9e-a43c-ed3c8e02ea05", + "machine-1149147d-71e2-43ea-8fe1-49256e5c441c", + "--zone=us-west1-b", + "--project=project", + "--format=json" + ], + "stdout": "[{\"name\":\"name\", \"networkInterfaces\":[{\"accessConfigs\":[{\"natIP\":\"0.0.0.0\"}]}]},{\"name\":\"name\", \"networkInterfaces\":[{\"accessConfigs\":[{\"natIP\":\"0.0.0.0\"}]}]},{\"name\":\"name\", \"networkInterfaces\":[{\"accessConfigs\":[{\"natIP\":\"0.0.0.0\"}]}]},{\"name\":\"name\", \"networkInterfaces\":[{\"accessConfigs\":[{\"natIP\":\"0.0.0.0\"}]}]},{\"name\":\"name\", \"networkInterfaces\":[{\"accessConfigs\":[{\"natIP\":\"0.0.0.0\"}]}]}]", + "returncode": "0" + }, + { + "args": [ + "gcloud", + "compute", + "ssh", + "machine-42c9bf6e-8d45-4c37-b1c0-7e4fdcf530fc", + "--ssh-key-file=/usr/local/google/home/user/.ssh/benchmark-tools", + "--zone=us-west1-b", + "--command=uname", + "--format=json" + ], + "stdout": "", + "returncode": "255" + }, + { + "args": [ + "gcloud", + "compute", + "ssh", + "machine-42c9bf6e-8d45-4c37-b1c0-7e4fdcf530fc", + "--ssh-key-file=/usr/local/google/home/user/.ssh/benchmark-tools", + "--zone=us-west1-b", + "--command=uname", + "--format=json" + ], + "stdout": "", + "returncode": "255" + }, + { + "args": [ + "gcloud", + "compute", + "ssh", + "machine-42c9bf6e-8d45-4c37-b1c0-7e4fdcf530fc", + "--ssh-key-file=/usr/local/google/home/user/.ssh/benchmark-tools", + "--zone=us-west1-b", + "--command=uname", + "--format=json" + ], + "stdout": "", + "returncode": "255" + }, + { + "args": [ + "gcloud", + "compute", + "ssh", + "machine-42c9bf6e-8d45-4c37-b1c0-7e4fdcf530fc", + "--ssh-key-file=/usr/local/google/home/user/.ssh/benchmark-tools", + "--zone=us-west1-b", + "--command=uname", + "--format=json" + ], + "stdout": "", + "returncode": "255" + }, + { + "args": [ + "gcloud", + "compute", + "ssh", + "machine-42c9bf6e-8d45-4c37-b1c0-7e4fdcf530fc", + "--ssh-key-file=/usr/local/google/home/user/.ssh/benchmark-tools", + "--zone=us-west1-b", + "--command=uname", + "--format=json" + ], + "stdout": "", + "returncode": "255" + }, + { + "args": [ + "gcloud", + "compute", + "ssh", + "machine-42c9bf6e-8d45-4c37-b1c0-7e4fdcf530fc", + "--ssh-key-file=/usr/local/google/home/user/.ssh/benchmark-tools", + "--zone=us-west1-b", + "--command=uname", + "--format=json" + ], + "stdout": "Linux\n[]\n", + "returncode": "0" + }, + { + "args": [ + "gcloud", + "compute", + "ssh", + "machine-5f28f145-cc2d-427d-9cbf-428d164cdb92", + "--ssh-key-file=/usr/local/google/home/user/.ssh/benchmark-tools", + "--zone=us-west1-b", + "--command=uname", + "--format=json" + ], + "stdout": "Linux\n[]\n", + "returncode": "0" + }, + { + "args": [ + "gcloud", + "compute", + "ssh", + "machine-da5859b5-bae6-435d-8005-0202d6f6e065", + "--ssh-key-file=/usr/local/google/home/user/.ssh/benchmark-tools", + "--zone=us-west1-b", + "--command=uname", + "--format=json" + ], + "stdout": "Linux\n[]\n", + "returncode": "0" + }, + { + "args": [ + "gcloud", + "compute", + "ssh", + "machine-880a8a2f-918c-4f9e-a43c-ed3c8e02ea05", + "--ssh-key-file=/usr/local/google/home/user/.ssh/benchmark-tools", + "--zone=us-west1-b", + "--command=uname", + "--format=json" + ], + "stdout": "Linux\n[]\n", + "returncode": "0" + }, + { + "args": [ + "gcloud", + "compute", + "ssh", + "machine-1149147d-71e2-43ea-8fe1-49256e5c441c", + "--ssh-key-file=/usr/local/google/home/user/.ssh/benchmark-tools", + "--zone=us-west1-b", + "--command=uname", + "--format=json" + ], + "stdout": "Linux\n[]\n", + "returncode": "0" + }, + { + "args": [ + "gcloud", + "compute", + "instances", + "delete", + "--quiet", + "machine-42c9bf6e-8d45-4c37-b1c0-7e4fdcf530fc", + "machine-5f28f145-cc2d-427d-9cbf-428d164cdb92", + "machine-da5859b5-bae6-435d-8005-0202d6f6e065", + "machine-880a8a2f-918c-4f9e-a43c-ed3c8e02ea05", + "machine-1149147d-71e2-43ea-8fe1-49256e5c441c", + "--zone=us-west1-b", + "--format=json" + ], + "stdout": "[]\n", + "returncode": "0" + } +] diff --git a/benchmarks/harness/machine_producers/testdata/get_one.json b/benchmarks/harness/machine_producers/testdata/get_one.json new file mode 100644 index 000000000..c359c19c8 --- /dev/null +++ b/benchmarks/harness/machine_producers/testdata/get_one.json @@ -0,0 +1,145 @@ +[ + { + "args": [ + "gcloud", + "compute", + "instances", + "list", + "--project", + "linux-testing-user", + "--format=json" + ], + "stdout": "[{\"name\":\"name\", \"networkInterfaces\":[{\"accessConfigs\":[{\"natIP\":\"0.0.0.0\"}]}]}]", + + "returncode": "0" + }, + { + "args": [ + "gcloud", + "compute", + "instances", + "create", + "machine-129dfcf9-b05b-4c16-a4cd-21353b570ddc", + "--preemptible", + "--image=ubuntu-1910-eoan-v20191204", + "--zone=us-west1-b", + "--image-project=ubuntu-os-cloud", + "--format=json" + ], + "stdout": "[{\"name\":\"name\", \"networkInterfaces\":[{\"accessConfigs\":[{\"natIP\":\"0.0.0.0\"}]}]}]", + "returncode": "0" + }, + { + "args": [ + "gcloud", + "compute", + "instances", + "start", + "machine-129dfcf9-b05b-4c16-a4cd-21353b570ddc", + "--zone=us-west1-b", + "--project=linux-testing-user", + "--format=json" + ], + "stdout": "[{\"name\":\"name\", \"networkInterfaces\":[{\"accessConfigs\":[{\"natIP\":\"0.0.0.0\"}]}]}]", + + "returncode": "0" + }, + { + "args": [ + "gcloud", + "compute", + "ssh", + "machine-129dfcf9-b05b-4c16-a4cd-21353b570ddc", + "--ssh-key-file=/usr/local/google/home/user/.ssh/benchmark-tools", + "--zone=us-west1-b", + "--command=uname", + "--format=json" + ], + "stdout": "", + "returncode": "255" + }, + { + "args": [ + "gcloud", + "compute", + "ssh", + "machine-129dfcf9-b05b-4c16-a4cd-21353b570ddc", + "--ssh-key-file=/usr/local/google/home/user/.ssh/benchmark-tools", + "--zone=us-west1-b", + "--command=uname", + "--format=json" + ], + "stdout": "", + "returncode": "255" + }, + { + "args": [ + "gcloud", + "compute", + "ssh", + "machine-129dfcf9-b05b-4c16-a4cd-21353b570ddc", + "--ssh-key-file=/usr/local/google/home/user/.ssh/benchmark-tools", + "--zone=us-west1-b", + "--command=uname", + "--format=json" + ], + "stdout": "", + "returncode": "255" + }, + { + "args": [ + "gcloud", + "compute", + "ssh", + "machine-129dfcf9-b05b-4c16-a4cd-21353b570ddc", + "--ssh-key-file=/usr/local/google/home/user/.ssh/benchmark-tools", + "--zone=us-west1-b", + "--command=uname", + "--format=json" + ], + "stdout": "", + "returncode": "255" + }, + { + "args": [ + "gcloud", + "compute", + "ssh", + "machine-129dfcf9-b05b-4c16-a4cd-21353b570ddc", + "--ssh-key-file=/usr/local/google/home/user/.ssh/benchmark-tools", + "--zone=us-west1-b", + "--command=uname", + "--format=json" + ], + "stdout": "", + "returncode": "255" + }, + { + "args": [ + "gcloud", + "compute", + "ssh", + "machine-129dfcf9-b05b-4c16-a4cd-21353b570ddc", + "--ssh-key-file=/usr/local/google/home/user/.ssh/benchmark-tools", + "--zone=us-west1-b", + "--command=uname", + "--format=json" + ], + "stdout": "Linux\n[]\n", + "returncode": "0" + }, + { + "args": [ + "gcloud", + "compute", + "instances", + "delete", + "--quiet", + "machine-129dfcf9-b05b-4c16-a4cd-21353b570ddc", + "--zone=us-west1-b", + "--format=json" + ], + "stdout": "[]\n", + "returncode": "0" + } +] diff --git a/benchmarks/harness/machine_producers/yaml_producer.py b/benchmarks/harness/machine_producers/yaml_producer.py new file mode 100644 index 000000000..5d334e480 --- /dev/null +++ b/benchmarks/harness/machine_producers/yaml_producer.py @@ -0,0 +1,106 @@ +# python3 +# Copyright 2019 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Producers based on yaml files.""" + +import os +import threading +from typing import Dict +from typing import List + +import yaml + +from benchmarks.harness import machine +from benchmarks.harness.machine_producers import machine_producer + + +class YamlMachineProducer(machine_producer.MachineProducer): + """Loads machines from a yaml file.""" + + def __init__(self, path: str): + self.machines = build_machines(path) + self.max_machines = len(self.machines) + self.machine_condition = threading.Condition() + + def get_machines(self, num_machines: int) -> List[machine.Machine]: + if num_machines > self.max_machines: + raise ValueError( + "Insufficient Ammount of Machines. {ask} asked for and have {max_num} max." + .format(ask=num_machines, max_num=self.max_machines)) + + with self.machine_condition: + while not self._enough_machines(num_machines): + self.machine_condition.wait(timeout=1) + return [self.machines.pop(0) for _ in range(num_machines)] + + def release_machines(self, machine_list: List[machine.Machine]): + with self.machine_condition: + while machine_list: + next_machine = machine_list.pop() + self.machines.append(next_machine) + self.machine_condition.notify() + + def _enough_machines(self, ask: int): + return ask <= len(self.machines) + + +def build_machines(path: str, num_machines: str = -1) -> List[machine.Machine]: + """Builds machine objects defined by the yaml file "path". + + Args: + path: The path to a yaml file which defines machines. + num_machines: Optional limit on how many machine objects to build. + + Returns: + Machine objects in a list. + + If num_machines is set, len(machines) <= num_machines. + """ + data = parse_yaml(path) + machines = [] + for key, value in data.items(): + if len(machines) == num_machines: + return machines + if isinstance(value, dict): + machines.append(machine.RemoteMachine(key, **value)) + else: + machines.append(machine.LocalMachine(key)) + return machines + + +def parse_yaml(path: str) -> Dict[str, Dict[str, str]]: + """Parse the yaml file pointed by path. + + Args: + path: The path to yaml file. + + Returns: + The contents of the yaml file as a dictionary. + """ + data = get_file_contents(path) + return yaml.load(data, Loader=yaml.Loader) + + +def get_file_contents(path: str) -> str: + """Dumps the file contents to a string and returns them. + + Args: + path: The path to dump. + + Returns: + The file contents as a string. + """ + if not os.path.isabs(path): + path = os.path.abspath(path) + with open(path) as input_file: + return input_file.read() diff --git a/benchmarks/harness/ssh_connection.py b/benchmarks/harness/ssh_connection.py new file mode 100644 index 000000000..e0bf258f1 --- /dev/null +++ b/benchmarks/harness/ssh_connection.py @@ -0,0 +1,108 @@ +# python3 +# Copyright 2019 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""SSHConnection handles the details of SSH connections.""" + +import os +import warnings + +import paramiko + +from benchmarks import harness + +# Get rid of paramiko Cryptography Warnings. +warnings.filterwarnings(action="ignore", module=".*paramiko.*") + + +def send_one_file(client: paramiko.SSHClient, path: str, remote_dir: str): + """Sends a single file via an SSH client. + + Args: + client: The existing SSH client. + path: The local path. + remote_dir: The remote directory. + """ + filename = path.split("/").pop() + client.exec_command("mkdir -p " + remote_dir) + with client.open_sftp() as ftp_client: + ftp_client.put(path, os.path.join(remote_dir, filename)) + + +class SSHConnection: + """SSH connection to a remote machine.""" + + def __init__(self, name: str, hostname: str, key_path: str, username: str, + **kwargs): + """Sets up a paramiko ssh connection to the given hostname.""" + self._name = name # Unused. + self._hostname = hostname + self._username = username + self._key_path = key_path # RSA Key path + self._kwargs = kwargs + # SSHConnection wraps paramiko. paramiko supports RSA, ECDSA, and Ed25519 + # keys, and we've chosen to only suport and require RSA keys. paramiko + # supports RSA keys that begin with '----BEGIN RSAKEY----'. + # https://stackoverflow.com/questions/53600581/ssh-key-generated-by-ssh-keygen-is-not-recognized-by-paramiko + self.rsa_key = self._rsa() + self.run("true") # Validate. + + def _client(self) -> paramiko.SSHClient: + """Returns a connected SSH client.""" + client = paramiko.SSHClient() + client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + client.connect( + hostname=self._hostname, + port=22, + username=self._username, + pkey=self.rsa_key, + allow_agent=False, + look_for_keys=False) + return client + + def _rsa(self): + if "key_password" in self._kwargs: + password = self._kwargs["key_password"] + else: + password = None + rsa = paramiko.RSAKey.from_private_key_file(self._key_path, password) + return rsa + + def run(self, cmd: str) -> (str, str): + """Runs a command via ssh. + + Args: + cmd: The shell command to run. + + Returns: + The contents of stdout and stderr. + """ + with self._client() as client: + _, stdout, stderr = client.exec_command(command=cmd) + stdout.channel.recv_exit_status() + stdout = stdout.read().decode("utf-8") + stderr = stderr.read().decode("utf-8") + return stdout, stderr + + def send_workload(self, name: str) -> str: + """Sends a workload tarball to the remote machine. + + Args: + name: The workload name. + + Returns: + The remote path. + """ + with self._client() as client: + send_one_file(client, harness.LOCAL_WORKLOADS_PATH.format(name), + harness.REMOTE_WORKLOADS_PATH.format(name)) + return harness.REMOTE_WORKLOADS_PATH.format(name) diff --git a/benchmarks/harness/tunnel_dispatcher.py b/benchmarks/harness/tunnel_dispatcher.py new file mode 100644 index 000000000..c56fd022a --- /dev/null +++ b/benchmarks/harness/tunnel_dispatcher.py @@ -0,0 +1,122 @@ +# python3 +# Copyright 2019 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Tunnel handles setting up connections to remote machines. + +Tunnel dispatcher is a wrapper around the connection from a local UNIX socket +and a remote UNIX socket via SSH with port forwarding. This is done to +initialize the pythonic dockerpy client to run containers on the remote host by +connecting to /var/run/docker.sock (where Docker is listening). Tunnel +dispatcher sets up the local UNIX socket and calls the `ssh` command as a +subprocess, and holds a reference to that subprocess. It manages clean-up on +exit as best it can by killing the ssh subprocess and deleting the local UNIX +socket,stored in /tmp for easy cleanup in most systems if this fails. + + Typical usage example: + + t = Tunnel(name, **kwargs) + t.connect() + client = t.get_docker_client() # + client.containers.run("ubuntu", "echo hello world") + +""" + +import os +import tempfile +import time + +import docker +import pexpect + +SSH_TUNNEL_COMMAND = """ssh + -o GlobalKnownHostsFile=/dev/null + -o UserKnownHostsFile=/dev/null + -o StrictHostKeyChecking=no + -o IdentitiesOnly=yes + -nNT -L {filename}:/var/run/docker.sock + -i {key_path} + {username}@{hostname}""" + + +class Tunnel(object): + """The tunnel object represents the tunnel via ssh. + + This connects a local unix domain socket with a remote socket. + + Attributes: + _filename: a temporary name of the UNIX socket prefixed by the name + argument. + _hostname: the IP or resolvable hostname of the remote host. + _username: the username of the ssh_key used to run ssh. + _key_path: path to a valid key. + _key_password: optional password to the ssh key in _key_path + _process: holds reference to the ssh subprocess created. + + Returns: + The new minimum port. + + Raises: + ConnectionError: If no available port is found. + """ + + def __init__(self, + name: str, + hostname: str, + username: str, + key_path: str, + key_password: str = "", + **kwargs): + self._filename = tempfile.NamedTemporaryFile(prefix=name).name + self._hostname = hostname + self._username = username + self._key_path = key_path + self._key_password = key_password + self._kwargs = kwargs + self._process = None + + def connect(self): + """Connects the SSH tunnel and stores the subprocess reference in _process.""" + cmd = SSH_TUNNEL_COMMAND.format( + filename=self._filename, + key_path=self._key_path, + username=self._username, + hostname=self._hostname) + self._process = pexpect.spawn(cmd, timeout=10) + + # If given a password, assume we'll be asked for it. + if self._key_password: + self._process.expect(["Enter passphrase for key .*: "]) + self._process.sendline(self._key_password) + + while True: + # Wait for the tunnel to appear. + if self._process.exitstatus is not None: + raise ConnectionError("Error in setting up ssh tunnel") + if os.path.exists(self._filename): + return + time.sleep(0.1) + + def path(self): + """Return the socket file.""" + return self._filename + + def get_docker_client(self): + """Returns a docker client for this Tunnel.""" + return docker.DockerClient(base_url="unix:/" + self._filename) + + def __del__(self): + """Closes the ssh connection process and deletes the socket file.""" + if self._process: + self._process.close() + if os.path.exists(self._filename): + os.remove(self._filename) |