diff options
Diffstat (limited to 'benchmarks/harness')
-rw-r--r-- | benchmarks/harness/BUILD | 89 | ||||
-rw-r--r-- | benchmarks/harness/__init__.py | 25 | ||||
-rw-r--r-- | benchmarks/harness/benchmark_driver.py | 85 | ||||
-rw-r--r-- | benchmarks/harness/container.py | 181 | ||||
-rw-r--r-- | benchmarks/harness/machine.py | 224 | ||||
-rw-r--r-- | benchmarks/harness/machine_mocks/BUILD | 9 | ||||
-rw-r--r-- | benchmarks/harness/machine_mocks/__init__.py | 81 | ||||
-rw-r--r-- | benchmarks/harness/machine_producers/BUILD | 40 | ||||
-rw-r--r-- | benchmarks/harness/machine_producers/__init__.py | 13 | ||||
-rw-r--r-- | benchmarks/harness/machine_producers/gcloud_mock_recorder.py | 97 | ||||
-rw-r--r-- | benchmarks/harness/machine_producers/machine_producer.py | 30 | ||||
-rw-r--r-- | benchmarks/harness/machine_producers/mock_producer.py | 31 | ||||
-rw-r--r-- | benchmarks/harness/machine_producers/yaml_producer.py | 106 | ||||
-rw-r--r-- | benchmarks/harness/ssh_connection.py | 111 | ||||
-rw-r--r-- | benchmarks/harness/tunnel_dispatcher.py | 122 |
15 files changed, 1244 insertions, 0 deletions
diff --git a/benchmarks/harness/BUILD b/benchmarks/harness/BUILD new file mode 100644 index 000000000..9546220c4 --- /dev/null +++ b/benchmarks/harness/BUILD @@ -0,0 +1,89 @@ +load("//benchmarks:defs.bzl", "py_library", "requirement") + +package( + default_visibility = ["//benchmarks:__subpackages__"], + licenses = ["notice"], +) + +py_library( + name = "harness", + srcs = ["__init__.py"], +) + +py_library( + name = "benchmark_driver", + srcs = ["benchmark_driver.py"], + deps = [ + "//benchmarks/harness/machine_mocks", + "//benchmarks/harness/machine_producers:machine_producer", + "//benchmarks/suites", + ], +) + +py_library( + name = "container", + srcs = ["container.py"], + deps = [ + requirement("asn1crypto", False), + requirement("chardet", False), + requirement("certifi", False), + requirement("docker", True), + requirement("docker-pycreds", False), + requirement("idna", False), + requirement("ptyprocess", False), + requirement("requests", False), + requirement("urllib3", False), + requirement("websocket-client", False), + ], +) + +py_library( + name = "machine", + srcs = ["machine.py"], + deps = [ + "//benchmarks/harness", + "//benchmarks/harness:container", + "//benchmarks/harness:ssh_connection", + "//benchmarks/harness:tunnel_dispatcher", + requirement("asn1crypto", False), + requirement("chardet", False), + requirement("certifi", False), + requirement("docker", True), + requirement("docker-pycreds", False), + requirement("idna", False), + requirement("ptyprocess", False), + requirement("requests", False), + requirement("urllib3", False), + requirement("websocket-client", False), + ], +) + +py_library( + name = "ssh_connection", + srcs = ["ssh_connection.py"], + deps = [ + "//benchmarks/harness", + requirement("bcrypt", False), + requirement("cffi", False), + requirement("paramiko", True), + requirement("cryptography", False), + ], +) + +py_library( + name = "tunnel_dispatcher", + srcs = ["tunnel_dispatcher.py"], + deps = [ + requirement("asn1crypto", False), + requirement("chardet", False), + requirement("certifi", False), + requirement("docker", True), + requirement("docker-pycreds", False), + requirement("idna", False), + requirement("pexpect", True), + requirement("ptyprocess", False), + requirement("requests", False), + requirement("urllib3", False), + requirement("websocket-client", False), + ], +) diff --git a/benchmarks/harness/__init__.py b/benchmarks/harness/__init__.py new file mode 100644 index 000000000..a7f34da9e --- /dev/null +++ b/benchmarks/harness/__init__.py @@ -0,0 +1,25 @@ +# python3 +# Copyright 2019 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Core benchmark utilities.""" + +import os + +# LOCAL_WORKLOADS_PATH defines the path to use for local workloads. This is a +# format string that accepts a single string parameter. +LOCAL_WORKLOADS_PATH = os.path.join( + os.path.dirname(__file__), "../workloads/{}") + +# REMOTE_WORKLOADS_PATH defines the path to use for storing the workloads on the +# remote host. This is a format string that accepts a single string parameter. +REMOTE_WORKLOADS_PATH = "workloads/{}" diff --git a/benchmarks/harness/benchmark_driver.py b/benchmarks/harness/benchmark_driver.py new file mode 100644 index 000000000..9abc21b54 --- /dev/null +++ b/benchmarks/harness/benchmark_driver.py @@ -0,0 +1,85 @@ +# python3 +# Copyright 2019 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Main driver for benchmarks.""" + +import copy +import statistics +import threading +import types + +from benchmarks import suites +from benchmarks.harness.machine_producers import machine_producer + + +# pylint: disable=too-many-instance-attributes +class BenchmarkDriver: + """Allocates machines and invokes a benchmark method.""" + + def __init__(self, + producer: machine_producer.MachineProducer, + method: types.FunctionType, + runs: int = 1, + **kwargs): + + self._producer = producer + self._method = method + self._kwargs = copy.deepcopy(kwargs) + self._threads = [] + self.lock = threading.RLock() + self._runs = runs + self._metric_results = {} + + def start(self): + """Starts a benchmark thread.""" + for _ in range(self._runs): + thread = threading.Thread(target=self._run_method) + thread.start() + self._threads.append(thread) + + def join(self): + """Joins the thread.""" + # pylint: disable=expression-not-assigned + [t.join() for t in self._threads] + + def _run_method(self): + """Runs all benchmarks.""" + machines = self._producer.get_machines( + suites.benchmark_machines(self._method)) + try: + result = self._method(*machines, **self._kwargs) + for name, res in result: + with self.lock: + if name in self._metric_results: + self._metric_results[name].append(res) + else: + self._metric_results[name] = [res] + finally: + # Always release. + self._producer.release_machines(machines) + + def median(self): + """Returns the median result, after join is finished.""" + for key, value in self._metric_results.items(): + yield key, [statistics.median(value)] + + def all(self): + """Returns all results.""" + for key, value in self._metric_results.items(): + yield key, value + + def meanstd(self): + """Returns all results.""" + for key, value in self._metric_results.items(): + mean = statistics.mean(value) + yield key, [mean, statistics.stdev(value, xbar=mean)] diff --git a/benchmarks/harness/container.py b/benchmarks/harness/container.py new file mode 100644 index 000000000..585436e20 --- /dev/null +++ b/benchmarks/harness/container.py @@ -0,0 +1,181 @@ +# python3 +# Copyright 2019 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Container definitions.""" + +import contextlib +import logging +import pydoc +import types +from typing import Tuple + +import docker +import docker.errors + +from benchmarks import workloads + + +class Container: + """Abstract container. + + Must be a context manager. + + Usage: + + with Container(client, image, ...): + ... + """ + + def run(self, **env) -> str: + """Run the container synchronously.""" + raise NotImplementedError + + def detach(self, **env): + """Run the container asynchronously.""" + raise NotImplementedError + + def address(self) -> Tuple[str, int]: + """Return the bound address for the container.""" + raise NotImplementedError + + def get_names(self) -> types.GeneratorType: + """Return names of all containers.""" + raise NotImplementedError + + +# pylint: disable=too-many-instance-attributes +class DockerContainer(Container): + """Class that handles creating a docker container.""" + + # pylint: disable=too-many-arguments + def __init__(self, + client: docker.DockerClient, + host: str, + image: str, + count: int = 1, + runtime: str = "runc", + port: int = 0, + **kwargs): + """Trys to setup "count" containers. + + Args: + client: A docker client from dockerpy. + host: The host address the image is running on. + image: The name of the image to run. + count: The number of containers to setup. + runtime: The container runtime to use. + port: The port to reserve. + **kwargs: Additional container options. + """ + assert count >= 1 + assert port == 0 or count == 1 + self._client = client + self._host = host + self._containers = [] + self._count = count + self._image = image + self._runtime = runtime + self._port = port + self._kwargs = kwargs + if port != 0: + self._ports = {"%d/tcp" % port: None} + else: + self._ports = {} + + @contextlib.contextmanager + def detach(self, **env): + env = ["%s=%s" % (key, value) for (key, value) in env.items()] + # Start all containers. + for _ in range(self._count): + try: + # Start the container in a detached mode. + container = self._client.containers.run( + self._image, + detach=True, + remove=True, + runtime=self._runtime, + ports=self._ports, + environment=env, + **self._kwargs) + logging.info("Started detached container %s -> %s", self._image, + container.attrs["Id"]) + self._containers.append(container) + except Exception as exc: + self._clean_containers() + raise exc + try: + # Wait for all containers to be up. + for container in self._containers: + while not container.attrs["State"]["Running"]: + container = self._client.containers.get(container.attrs["Id"]) + yield self + finally: + self._clean_containers() + + def address(self) -> Tuple[str, int]: + assert self._count == 1 + assert self._port != 0 + container = self._client.containers.get(self._containers[0].attrs["Id"]) + port = container.attrs["NetworkSettings"]["Ports"][ + "%d/tcp" % self._port][0]["HostPort"] + return (self._host, port) + + def get_names(self) -> types.GeneratorType: + for container in self._containers: + yield container.name + + def run(self, **env) -> str: + env = ["%s=%s" % (key, value) for (key, value) in env.items()] + return self._client.containers.run( + self._image, + runtime=self._runtime, + ports=self._ports, + remove=True, + environment=env, + **self._kwargs).decode("utf-8") + + def _clean_containers(self): + """Kills all containers.""" + for container in self._containers: + try: + container.kill() + except docker.errors.NotFound: + pass + + +class MockContainer(Container): + """Mock of Container.""" + + def __init__(self, workload: str): + self._workload = workload + + def __enter__(self): + return self + + def run(self, **env): + # Lookup sample data if any exists for the workload module. We use a + # well-defined test locate and a well-defined sample function. + mod = pydoc.locate(workloads.__name__ + "." + self._workload) + if hasattr(mod, "sample"): + return mod.sample(**env) + return "" # No output. + + def address(self) -> Tuple[str, int]: + return ("example.com", 80) + + def get_names(self) -> types.GeneratorType: + yield "mock" + + @contextlib.contextmanager + def detach(self, **env): + yield self diff --git a/benchmarks/harness/machine.py b/benchmarks/harness/machine.py new file mode 100644 index 000000000..66b719b63 --- /dev/null +++ b/benchmarks/harness/machine.py @@ -0,0 +1,224 @@ +# python3 +# Copyright 2019 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Machine abstraction passed to benchmarks to run docker containers. + +Abstraction for interacting with test machines. Machines are produced +by Machine producers and represent a local or remote machine. Benchmark +methods in /benchmarks/suite are passed the required number of machines in order +to run the benchmark. Machines contain methods to run commands via bash, +possibly over ssh. Machines also hold a connection to the docker UNIX socket +to run contianers. + + Typical usage example: + + machine = Machine() + machine.run(cmd) + machine.pull(path) + container = machine.container() +""" + +import logging +import re +import subprocess +import time +from typing import Tuple + +import docker + +from benchmarks import harness +from benchmarks.harness import container +from benchmarks.harness import machine_mocks +from benchmarks.harness import ssh_connection +from benchmarks.harness import tunnel_dispatcher + + +class Machine(object): + """The machine object is the primary object for benchmarks. + + Machine objects are passed to each metric function call and benchmarks use + machines to access real connections to those machines. + + Attributes: + _name: Name as a string + """ + _name = "" + + def run(self, cmd: str) -> Tuple[str, str]: + """Convenience method for running a bash command on a machine object. + + Some machines may point to the local machine, and thus, do not have ssh + connections. Run runs a command either local or over ssh and returns the + output stdout and stderr as strings. + + Args: + cmd: The command to run as a string. + + Returns: + The command output. + """ + raise NotImplementedError + + def read(self, path: str) -> str: + """Reads the contents of some file. + + This will be mocked. + + Args: + path: The path to the file to be read. + + Returns: + The file contents. + """ + raise NotImplementedError + + def pull(self, workload: str) -> str: + """Send the given workload to the machine, build and tag it. + + All images must be defined by the workloads directory. + + Args: + workload: The workload name. + + Returns: + The workload tag. + """ + raise NotImplementedError + + def container(self, image: str, **kwargs) -> container.Container: + """Returns a container object. + + Args: + image: The pulled image tag. + **kwargs: Additional container options. + + Returns: + :return: a container.Container object. + """ + raise NotImplementedError + + def sleep(self, amount: float): + """Sleeps the given amount of time.""" + time.sleep(amount) + + def __str__(self): + return self._name + + +class MockMachine(Machine): + """A mocked machine.""" + _name = "mock" + + def run(self, cmd: str) -> Tuple[str, str]: + return "", "" + + def read(self, path: str) -> str: + return machine_mocks.Readfile(path) + + def pull(self, workload: str) -> str: + return workload # Workload is the tag. + + def container(self, image: str, **kwargs) -> container.Container: + return container.MockContainer(image) + + def sleep(self, amount: float): + pass + + +def get_address(machine: Machine) -> str: + """Return a machine's default address.""" + default_route, _ = machine.run("ip route get 8.8.8.8") + return re.search(" src ([0-9.]+) ", default_route).group(1) + + +class LocalMachine(Machine): + """The local machine. + + Attributes: + _name: Name as a string + _docker_client: a pythonic connection to to the local dockerd unix socket. + See: https://github.com/docker/docker-py + """ + + def __init__(self, name): + self._name = name + self._docker_client = docker.from_env() + + def run(self, cmd: str) -> Tuple[str, str]: + process = subprocess.Popen( + cmd.split(" "), stdout=subprocess.PIPE, stderr=subprocess.PIPE) + stdout, stderr = process.communicate() + return stdout.decode("utf-8"), stderr.decode("utf-8") + + def read(self, path: str) -> str: + # Read the exact path locally. + return open(path, "r").read() + + def pull(self, workload: str) -> str: + # Run the docker build command locally. + logging.info("Building %s@%s locally...", workload, self._name) + self.run("docker build --tag={} {}".format( + workload, harness.LOCAL_WORKLOADS_PATH.format(workload))) + return workload # Workload is the tag. + + def container(self, image: str, **kwargs) -> container.Container: + # Return a local docker container directly. + return container.DockerContainer(self._docker_client, get_address(self), + image, **kwargs) + + def sleep(self, amount: float): + time.sleep(amount) + + +class RemoteMachine(Machine): + """Remote machine accessible via an SSH connection. + + Attributes: + _name: Name as a string + _ssh_connection: a paramiko backed ssh connection which can be used to run + commands on this machine + _tunnel: a python wrapper around a port forwarded ssh connection between a + local unix socket and the remote machine's dockerd unix socket. + _docker_client: a pythonic wrapper backed by the _tunnel. Allows sending + docker commands: see https://github.com/docker/docker-py + """ + + def __init__(self, name, **kwargs): + self._name = name + self._ssh_connection = ssh_connection.SSHConnection(name, **kwargs) + self._tunnel = tunnel_dispatcher.Tunnel(name, **kwargs) + self._tunnel.connect() + self._docker_client = self._tunnel.get_docker_client() + + def run(self, cmd: str) -> Tuple[str, str]: + return self._ssh_connection.run(cmd) + + def read(self, path: str) -> str: + # Just cat remotely. + stdout, stderr = self._ssh_connection.run("cat '{}'".format(path)) + return stdout + stderr + + def pull(self, workload: str) -> str: + # Push to the remote machine and build. + logging.info("Building %s@%s remotely...", workload, self._name) + remote_path = self._ssh_connection.send_workload(workload) + self.run("docker build --tag={} {}".format(workload, remote_path)) + return workload # Workload is the tag. + + def container(self, image: str, **kwargs) -> container.Container: + # Return a remote docker container. + return container.DockerContainer(self._docker_client, get_address(self), + image, **kwargs) + + def sleep(self, amount: float): + time.sleep(amount) diff --git a/benchmarks/harness/machine_mocks/BUILD b/benchmarks/harness/machine_mocks/BUILD new file mode 100644 index 000000000..c8ec4bc79 --- /dev/null +++ b/benchmarks/harness/machine_mocks/BUILD @@ -0,0 +1,9 @@ +package( + default_visibility = ["//benchmarks:__subpackages__"], + licenses = ["notice"], +) + +py_library( + name = "machine_mocks", + srcs = ["__init__.py"], +) diff --git a/benchmarks/harness/machine_mocks/__init__.py b/benchmarks/harness/machine_mocks/__init__.py new file mode 100644 index 000000000..00f0085d7 --- /dev/null +++ b/benchmarks/harness/machine_mocks/__init__.py @@ -0,0 +1,81 @@ +# python3 +# Copyright 2019 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Machine mock files.""" + +MEMINFO = """\ +MemTotal: 7652344 kB +MemFree: 7174724 kB +MemAvailable: 7152008 kB +Buffers: 7544 kB +Cached: 178856 kB +SwapCached: 0 kB +Active: 270928 kB +Inactive: 68436 kB +Active(anon): 153124 kB +Inactive(anon): 880 kB +Active(file): 117804 kB +Inactive(file): 67556 kB +Unevictable: 0 kB +Mlocked: 0 kB +SwapTotal: 0 kB +SwapFree: 0 kB +Dirty: 900 kB +Writeback: 0 kB +AnonPages: 153000 kB +Mapped: 129120 kB +Shmem: 1044 kB +Slab: 60864 kB +SReclaimable: 22792 kB +SUnreclaim: 38072 kB +KernelStack: 2672 kB +PageTables: 5756 kB +NFS_Unstable: 0 kB +Bounce: 0 kB +WritebackTmp: 0 kB +CommitLimit: 3826172 kB +Committed_AS: 663836 kB +VmallocTotal: 34359738367 kB +VmallocUsed: 0 kB +VmallocChunk: 0 kB +HardwareCorrupted: 0 kB +AnonHugePages: 0 kB +ShmemHugePages: 0 kB +ShmemPmdMapped: 0 kB +CmaTotal: 0 kB +CmaFree: 0 kB +HugePages_Total: 0 +HugePages_Free: 0 +HugePages_Rsvd: 0 +HugePages_Surp: 0 +Hugepagesize: 2048 kB +DirectMap4k: 94196 kB +DirectMap2M: 4624384 kB +DirectMap1G: 3145728 kB +""" + +CONTENTS = { + "/proc/meminfo": MEMINFO, +} + + +def Readfile(path: str) -> str: + """Reads a mock file. + + Args: + path: The target path. + + Returns: + Mocked file contents or None. + """ + return CONTENTS.get(path, None) diff --git a/benchmarks/harness/machine_producers/BUILD b/benchmarks/harness/machine_producers/BUILD new file mode 100644 index 000000000..a48da02a1 --- /dev/null +++ b/benchmarks/harness/machine_producers/BUILD @@ -0,0 +1,40 @@ +load("//benchmarks:defs.bzl", "py_library", "requirement") + +package( + default_visibility = ["//benchmarks:__subpackages__"], + licenses = ["notice"], +) + +py_library( + name = "harness", + srcs = ["__init__.py"], +) + +py_library( + name = "machine_producer", + srcs = ["machine_producer.py"], +) + +py_library( + name = "mock_producer", + srcs = ["mock_producer.py"], + deps = [ + "//benchmarks/harness:machine", + "//benchmarks/harness/machine_producers:machine_producer", + ], +) + +py_library( + name = "yaml_producer", + srcs = ["yaml_producer.py"], + deps = [ + "//benchmarks/harness:machine", + "//benchmarks/harness/machine_producers:machine_producer", + requirement("PyYAML", False), + ], +) + +py_library( + name = "gcloud_mock_recorder", + srcs = ["gcloud_mock_recorder.py"], +) diff --git a/benchmarks/harness/machine_producers/__init__.py b/benchmarks/harness/machine_producers/__init__.py new file mode 100644 index 000000000..634ef4843 --- /dev/null +++ b/benchmarks/harness/machine_producers/__init__.py @@ -0,0 +1,13 @@ +# python3 +# Copyright 2019 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/benchmarks/harness/machine_producers/gcloud_mock_recorder.py b/benchmarks/harness/machine_producers/gcloud_mock_recorder.py new file mode 100644 index 000000000..fd9837a37 --- /dev/null +++ b/benchmarks/harness/machine_producers/gcloud_mock_recorder.py @@ -0,0 +1,97 @@ +# python3 +# Copyright 2019 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""A recorder and replay for testing the GCloudProducer. + +MockPrinter and MockReader handle printing and reading mock data for the +purposes of testing. MockPrinter is passed to GCloudProducer objects. The user +can then run scenarios and record them for playback in tests later. + +MockReader is passed to MockGcloudProducer objects and handles reading the +previously recorded mock data. + +It is left to the user to check if data printed is properly redacted for their +own use. The intended usecase for this class is data coming from gcloud +commands, which will contain public IPs and other instance data. + +The data format is json and printed/read from the ./test_data directory. The +data is the output of subprocess.CompletedProcess objects in json format. + + Typical usage example: + + recorder = MockPrinter() + producer = GCloudProducer(args, recorder) + machines = producer.get_machines(1) + with open("my_file.json") as fd: + recorder.write_out(fd) + + reader = MockReader(filename) + producer = MockGcloudProducer(args, mock) + machines = producer.get_machines(1) + assert len(machines) == 1 +""" + +import io +import json +import subprocess + + +class MockPrinter(object): + """Handles printing Mock data for MockGcloudProducer. + + Attributes: + _records: list of json object records for printing + """ + + def __init__(self): + self._records = [] + + def record(self, entry: subprocess.CompletedProcess): + """Records data and strips out ip addresses.""" + + record = { + "args": entry.args, + "stdout": entry.stdout.decode("utf-8"), + "returncode": str(entry.returncode) + } + self._records.append(record) + + def write_out(self, fd: io.FileIO): + """Prints out the data into the given filepath.""" + fd.write(json.dumps(self._records, indent=4)) + + +class MockReader(object): + """Handles reading Mock data for MockGcloudProducer. + + Attributes: + _records: List[json] records read from the passed in file. + """ + + def __init__(self, filepath: str): + with open(filepath, "rb") as file: + self._records = json.loads(file.read()) + self._i = 0 + + def __iter__(self): + return self + + def __next__(self, args) -> subprocess.CompletedProcess: + """Returns the next record as a CompletedProcess.""" + if self._i < len(self._records): + record = self._records[self._i] + stdout = record["stdout"].encode("ascii") + returncode = int(record["returncode"]) + return subprocess.CompletedProcess( + args=args, returncode=returncode, stdout=stdout, stderr=b"") + raise StopIteration() diff --git a/benchmarks/harness/machine_producers/machine_producer.py b/benchmarks/harness/machine_producers/machine_producer.py new file mode 100644 index 000000000..124ee14cc --- /dev/null +++ b/benchmarks/harness/machine_producers/machine_producer.py @@ -0,0 +1,30 @@ +# python3 +# Copyright 2019 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Abstract types.""" + +from typing import List + +from benchmarks.harness import machine + + +class MachineProducer: + """Abstract Machine producer.""" + + def get_machines(self, num_machines: int) -> List[machine.Machine]: + """Returns the requested number of machines.""" + raise NotImplementedError + + def release_machines(self, machine_list: List[machine.Machine]): + """Releases the given set of machines.""" + raise NotImplementedError diff --git a/benchmarks/harness/machine_producers/mock_producer.py b/benchmarks/harness/machine_producers/mock_producer.py new file mode 100644 index 000000000..4f29ad53f --- /dev/null +++ b/benchmarks/harness/machine_producers/mock_producer.py @@ -0,0 +1,31 @@ +# python3 +# Copyright 2019 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Producers of mocks.""" + +from typing import List + +from benchmarks.harness import machine +from benchmarks.harness.machine_producers import machine_producer + + +class MockMachineProducer(machine_producer.MachineProducer): + """Produces MockMachine objects.""" + + def get_machines(self, num_machines: int) -> List[machine.MockMachine]: + """Returns the request number of MockMachines.""" + return [machine.MockMachine() for i in range(num_machines)] + + def release_machines(self, machine_list: List[machine.MockMachine]): + """No-op.""" + return diff --git a/benchmarks/harness/machine_producers/yaml_producer.py b/benchmarks/harness/machine_producers/yaml_producer.py new file mode 100644 index 000000000..5d334e480 --- /dev/null +++ b/benchmarks/harness/machine_producers/yaml_producer.py @@ -0,0 +1,106 @@ +# python3 +# Copyright 2019 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Producers based on yaml files.""" + +import os +import threading +from typing import Dict +from typing import List + +import yaml + +from benchmarks.harness import machine +from benchmarks.harness.machine_producers import machine_producer + + +class YamlMachineProducer(machine_producer.MachineProducer): + """Loads machines from a yaml file.""" + + def __init__(self, path: str): + self.machines = build_machines(path) + self.max_machines = len(self.machines) + self.machine_condition = threading.Condition() + + def get_machines(self, num_machines: int) -> List[machine.Machine]: + if num_machines > self.max_machines: + raise ValueError( + "Insufficient Ammount of Machines. {ask} asked for and have {max_num} max." + .format(ask=num_machines, max_num=self.max_machines)) + + with self.machine_condition: + while not self._enough_machines(num_machines): + self.machine_condition.wait(timeout=1) + return [self.machines.pop(0) for _ in range(num_machines)] + + def release_machines(self, machine_list: List[machine.Machine]): + with self.machine_condition: + while machine_list: + next_machine = machine_list.pop() + self.machines.append(next_machine) + self.machine_condition.notify() + + def _enough_machines(self, ask: int): + return ask <= len(self.machines) + + +def build_machines(path: str, num_machines: str = -1) -> List[machine.Machine]: + """Builds machine objects defined by the yaml file "path". + + Args: + path: The path to a yaml file which defines machines. + num_machines: Optional limit on how many machine objects to build. + + Returns: + Machine objects in a list. + + If num_machines is set, len(machines) <= num_machines. + """ + data = parse_yaml(path) + machines = [] + for key, value in data.items(): + if len(machines) == num_machines: + return machines + if isinstance(value, dict): + machines.append(machine.RemoteMachine(key, **value)) + else: + machines.append(machine.LocalMachine(key)) + return machines + + +def parse_yaml(path: str) -> Dict[str, Dict[str, str]]: + """Parse the yaml file pointed by path. + + Args: + path: The path to yaml file. + + Returns: + The contents of the yaml file as a dictionary. + """ + data = get_file_contents(path) + return yaml.load(data, Loader=yaml.Loader) + + +def get_file_contents(path: str) -> str: + """Dumps the file contents to a string and returns them. + + Args: + path: The path to dump. + + Returns: + The file contents as a string. + """ + if not os.path.isabs(path): + path = os.path.abspath(path) + with open(path) as input_file: + return input_file.read() diff --git a/benchmarks/harness/ssh_connection.py b/benchmarks/harness/ssh_connection.py new file mode 100644 index 000000000..fcbfbcdb2 --- /dev/null +++ b/benchmarks/harness/ssh_connection.py @@ -0,0 +1,111 @@ +# python3 +# Copyright 2019 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""SSHConnection handles the details of SSH connections.""" + +import os +import warnings + +import paramiko + +from benchmarks import harness + +# Get rid of paramiko Cryptography Warnings. +warnings.filterwarnings(action="ignore", module=".*paramiko.*") + + +def send_one_file(client: paramiko.SSHClient, path: str, remote_dir: str): + """Sends a single file via an SSH client. + + Args: + client: The existing SSH client. + path: The local path. + remote_dir: The remote directory. + """ + filename = path.split("/").pop() + client.exec_command("mkdir -p " + remote_dir) + with client.open_sftp() as ftp_client: + ftp_client.put(path, os.path.join(remote_dir, filename)) + + +class SSHConnection: + """SSH connection to a remote machine.""" + + def __init__(self, name: str, hostname: str, key_path: str, username: str, + **kwargs): + """Sets up a paramiko ssh connection to the given hostname.""" + self._name = name # Unused. + self._hostname = hostname + self._username = username + self._key_path = key_path # RSA Key path + self._kwargs = kwargs + # SSHConnection wraps paramiko. paramiko supports RSA, ECDSA, and Ed25519 + # keys, and we've chosen to only suport and require RSA keys. paramiko + # supports RSA keys that begin with '----BEGIN RSAKEY----'. + # https://stackoverflow.com/questions/53600581/ssh-key-generated-by-ssh-keygen-is-not-recognized-by-paramiko + self.rsa_key = self._rsa() + self.run("true") # Validate. + + def _client(self) -> paramiko.SSHClient: + """Returns a connected SSH client.""" + client = paramiko.SSHClient() + client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + client.connect( + hostname=self._hostname, + port=22, + username=self._username, + pkey=self.rsa_key, + allow_agent=False, + look_for_keys=False) + return client + + def _rsa(self): + if "key_password" in self._kwargs: + password = self._kwargs["key_password"] + else: + password = None + rsa = paramiko.RSAKey.from_private_key_file(self._key_path, password) + return rsa + + def run(self, cmd: str) -> (str, str): + """Runs a command via ssh. + + Args: + cmd: The shell command to run. + + Returns: + The contents of stdout and stderr. + """ + with self._client() as client: + _, stdout, stderr = client.exec_command(command=cmd) + stdout.channel.recv_exit_status() + stdout = stdout.read().decode("utf-8") + stderr = stderr.read().decode("utf-8") + return stdout, stderr + + def send_workload(self, name: str) -> str: + """Sends a workload to the remote machine. + + Args: + name: The workload name. + + Returns: + The remote path. + """ + with self._client() as client: + for dirpath, _, filenames in os.walk( + harness.LOCAL_WORKLOADS_PATH.format(name)): + for filename in filenames: + send_one_file(client, os.path.join(dirpath, filename), + harness.REMOTE_WORKLOADS_PATH.format(name)) + return harness.REMOTE_WORKLOADS_PATH.format(name) diff --git a/benchmarks/harness/tunnel_dispatcher.py b/benchmarks/harness/tunnel_dispatcher.py new file mode 100644 index 000000000..c56fd022a --- /dev/null +++ b/benchmarks/harness/tunnel_dispatcher.py @@ -0,0 +1,122 @@ +# python3 +# Copyright 2019 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Tunnel handles setting up connections to remote machines. + +Tunnel dispatcher is a wrapper around the connection from a local UNIX socket +and a remote UNIX socket via SSH with port forwarding. This is done to +initialize the pythonic dockerpy client to run containers on the remote host by +connecting to /var/run/docker.sock (where Docker is listening). Tunnel +dispatcher sets up the local UNIX socket and calls the `ssh` command as a +subprocess, and holds a reference to that subprocess. It manages clean-up on +exit as best it can by killing the ssh subprocess and deleting the local UNIX +socket,stored in /tmp for easy cleanup in most systems if this fails. + + Typical usage example: + + t = Tunnel(name, **kwargs) + t.connect() + client = t.get_docker_client() # + client.containers.run("ubuntu", "echo hello world") + +""" + +import os +import tempfile +import time + +import docker +import pexpect + +SSH_TUNNEL_COMMAND = """ssh + -o GlobalKnownHostsFile=/dev/null + -o UserKnownHostsFile=/dev/null + -o StrictHostKeyChecking=no + -o IdentitiesOnly=yes + -nNT -L {filename}:/var/run/docker.sock + -i {key_path} + {username}@{hostname}""" + + +class Tunnel(object): + """The tunnel object represents the tunnel via ssh. + + This connects a local unix domain socket with a remote socket. + + Attributes: + _filename: a temporary name of the UNIX socket prefixed by the name + argument. + _hostname: the IP or resolvable hostname of the remote host. + _username: the username of the ssh_key used to run ssh. + _key_path: path to a valid key. + _key_password: optional password to the ssh key in _key_path + _process: holds reference to the ssh subprocess created. + + Returns: + The new minimum port. + + Raises: + ConnectionError: If no available port is found. + """ + + def __init__(self, + name: str, + hostname: str, + username: str, + key_path: str, + key_password: str = "", + **kwargs): + self._filename = tempfile.NamedTemporaryFile(prefix=name).name + self._hostname = hostname + self._username = username + self._key_path = key_path + self._key_password = key_password + self._kwargs = kwargs + self._process = None + + def connect(self): + """Connects the SSH tunnel and stores the subprocess reference in _process.""" + cmd = SSH_TUNNEL_COMMAND.format( + filename=self._filename, + key_path=self._key_path, + username=self._username, + hostname=self._hostname) + self._process = pexpect.spawn(cmd, timeout=10) + + # If given a password, assume we'll be asked for it. + if self._key_password: + self._process.expect(["Enter passphrase for key .*: "]) + self._process.sendline(self._key_password) + + while True: + # Wait for the tunnel to appear. + if self._process.exitstatus is not None: + raise ConnectionError("Error in setting up ssh tunnel") + if os.path.exists(self._filename): + return + time.sleep(0.1) + + def path(self): + """Return the socket file.""" + return self._filename + + def get_docker_client(self): + """Returns a docker client for this Tunnel.""" + return docker.DockerClient(base_url="unix:/" + self._filename) + + def __del__(self): + """Closes the ssh connection process and deletes the socket file.""" + if self._process: + self._process.close() + if os.path.exists(self._filename): + os.remove(self._filename) |