path: root/benchmarks/harness
diff options
Diffstat (limited to 'benchmarks/harness')
15 files changed, 1244 insertions, 0 deletions
diff --git a/benchmarks/harness/BUILD b/benchmarks/harness/BUILD
new file mode 100644
index 000000000..9546220c4
--- /dev/null
+++ b/benchmarks/harness/BUILD
@@ -0,0 +1,89 @@
+load("//benchmarks:defs.bzl", "py_library", "requirement")
+ default_visibility = ["//benchmarks:__subpackages__"],
+ licenses = ["notice"],
+ name = "harness",
+ srcs = [""],
+ name = "benchmark_driver",
+ srcs = [""],
+ deps = [
+ "//benchmarks/harness/machine_mocks",
+ "//benchmarks/harness/machine_producers:machine_producer",
+ "//benchmarks/suites",
+ ],
+ name = "container",
+ srcs = [""],
+ deps = [
+ requirement("asn1crypto", False),
+ requirement("chardet", False),
+ requirement("certifi", False),
+ requirement("docker", True),
+ requirement("docker-pycreds", False),
+ requirement("idna", False),
+ requirement("ptyprocess", False),
+ requirement("requests", False),
+ requirement("urllib3", False),
+ requirement("websocket-client", False),
+ ],
+ name = "machine",
+ srcs = [""],
+ deps = [
+ "//benchmarks/harness",
+ "//benchmarks/harness:container",
+ "//benchmarks/harness:ssh_connection",
+ "//benchmarks/harness:tunnel_dispatcher",
+ requirement("asn1crypto", False),
+ requirement("chardet", False),
+ requirement("certifi", False),
+ requirement("docker", True),
+ requirement("docker-pycreds", False),
+ requirement("idna", False),
+ requirement("ptyprocess", False),
+ requirement("requests", False),
+ requirement("urllib3", False),
+ requirement("websocket-client", False),
+ ],
+ name = "ssh_connection",
+ srcs = [""],
+ deps = [
+ "//benchmarks/harness",
+ requirement("bcrypt", False),
+ requirement("cffi", False),
+ requirement("paramiko", True),
+ requirement("cryptography", False),
+ ],
+ name = "tunnel_dispatcher",
+ srcs = [""],
+ deps = [
+ requirement("asn1crypto", False),
+ requirement("chardet", False),
+ requirement("certifi", False),
+ requirement("docker", True),
+ requirement("docker-pycreds", False),
+ requirement("idna", False),
+ requirement("pexpect", True),
+ requirement("ptyprocess", False),
+ requirement("requests", False),
+ requirement("urllib3", False),
+ requirement("websocket-client", False),
+ ],
diff --git a/benchmarks/harness/ b/benchmarks/harness/
new file mode 100644
index 000000000..a7f34da9e
--- /dev/null
+++ b/benchmarks/harness/
@@ -0,0 +1,25 @@
+# python3
+# Copyright 2019 Google LLC
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""Core benchmark utilities."""
+import os
+# LOCAL_WORKLOADS_PATH defines the path to use for local workloads. This is a
+# format string that accepts a single string parameter.
+LOCAL_WORKLOADS_PATH = os.path.join(
+ os.path.dirname(__file__), "../workloads/{}")
+# REMOTE_WORKLOADS_PATH defines the path to use for storing the workloads on the
+# remote host. This is a format string that accepts a single string parameter.
+REMOTE_WORKLOADS_PATH = "workloads/{}"
diff --git a/benchmarks/harness/ b/benchmarks/harness/
new file mode 100644
index 000000000..9abc21b54
--- /dev/null
+++ b/benchmarks/harness/
@@ -0,0 +1,85 @@
+# python3
+# Copyright 2019 Google LLC
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""Main driver for benchmarks."""
+import copy
+import statistics
+import threading
+import types
+from benchmarks import suites
+from benchmarks.harness.machine_producers import machine_producer
+# pylint: disable=too-many-instance-attributes
+class BenchmarkDriver:
+ """Allocates machines and invokes a benchmark method."""
+ def __init__(self,
+ producer: machine_producer.MachineProducer,
+ method: types.FunctionType,
+ runs: int = 1,
+ **kwargs):
+ self._producer = producer
+ self._method = method
+ self._kwargs = copy.deepcopy(kwargs)
+ self._threads = []
+ self.lock = threading.RLock()
+ self._runs = runs
+ self._metric_results = {}
+ def start(self):
+ """Starts a benchmark thread."""
+ for _ in range(self._runs):
+ thread = threading.Thread(target=self._run_method)
+ thread.start()
+ self._threads.append(thread)
+ def join(self):
+ """Joins the thread."""
+ # pylint: disable=expression-not-assigned
+ [t.join() for t in self._threads]
+ def _run_method(self):
+ """Runs all benchmarks."""
+ machines = self._producer.get_machines(
+ suites.benchmark_machines(self._method))
+ try:
+ result = self._method(*machines, **self._kwargs)
+ for name, res in result:
+ with self.lock:
+ if name in self._metric_results:
+ self._metric_results[name].append(res)
+ else:
+ self._metric_results[name] = [res]
+ finally:
+ # Always release.
+ self._producer.release_machines(machines)
+ def median(self):
+ """Returns the median result, after join is finished."""
+ for key, value in self._metric_results.items():
+ yield key, [statistics.median(value)]
+ def all(self):
+ """Returns all results."""
+ for key, value in self._metric_results.items():
+ yield key, value
+ def meanstd(self):
+ """Returns all results."""
+ for key, value in self._metric_results.items():
+ mean = statistics.mean(value)
+ yield key, [mean, statistics.stdev(value, xbar=mean)]
diff --git a/benchmarks/harness/ b/benchmarks/harness/
new file mode 100644
index 000000000..585436e20
--- /dev/null
+++ b/benchmarks/harness/
@@ -0,0 +1,181 @@
+# python3
+# Copyright 2019 Google LLC
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""Container definitions."""
+import contextlib
+import logging
+import pydoc
+import types
+from typing import Tuple
+import docker
+import docker.errors
+from benchmarks import workloads
+class Container:
+ """Abstract container.
+ Must be a context manager.
+ Usage:
+ with Container(client, image, ...):
+ ...
+ """
+ def run(self, **env) -> str:
+ """Run the container synchronously."""
+ raise NotImplementedError
+ def detach(self, **env):
+ """Run the container asynchronously."""
+ raise NotImplementedError
+ def address(self) -> Tuple[str, int]:
+ """Return the bound address for the container."""
+ raise NotImplementedError
+ def get_names(self) -> types.GeneratorType:
+ """Return names of all containers."""
+ raise NotImplementedError
+# pylint: disable=too-many-instance-attributes
+class DockerContainer(Container):
+ """Class that handles creating a docker container."""
+ # pylint: disable=too-many-arguments
+ def __init__(self,
+ client: docker.DockerClient,
+ host: str,
+ image: str,
+ count: int = 1,
+ runtime: str = "runc",
+ port: int = 0,
+ **kwargs):
+ """Trys to setup "count" containers.
+ Args:
+ client: A docker client from dockerpy.
+ host: The host address the image is running on.
+ image: The name of the image to run.
+ count: The number of containers to setup.
+ runtime: The container runtime to use.
+ port: The port to reserve.
+ **kwargs: Additional container options.
+ """
+ assert count >= 1
+ assert port == 0 or count == 1
+ self._client = client
+ self._host = host
+ self._containers = []
+ self._count = count
+ self._image = image
+ self._runtime = runtime
+ self._port = port
+ self._kwargs = kwargs
+ if port != 0:
+ self._ports = {"%d/tcp" % port: None}
+ else:
+ self._ports = {}
+ @contextlib.contextmanager
+ def detach(self, **env):
+ env = ["%s=%s" % (key, value) for (key, value) in env.items()]
+ # Start all containers.
+ for _ in range(self._count):
+ try:
+ # Start the container in a detached mode.
+ container =
+ self._image,
+ detach=True,
+ remove=True,
+ runtime=self._runtime,
+ ports=self._ports,
+ environment=env,
+ **self._kwargs)
+"Started detached container %s -> %s", self._image,
+ container.attrs["Id"])
+ self._containers.append(container)
+ except Exception as exc:
+ self._clean_containers()
+ raise exc
+ try:
+ # Wait for all containers to be up.
+ for container in self._containers:
+ while not container.attrs["State"]["Running"]:
+ container = self._client.containers.get(container.attrs["Id"])
+ yield self
+ finally:
+ self._clean_containers()
+ def address(self) -> Tuple[str, int]:
+ assert self._count == 1
+ assert self._port != 0
+ container = self._client.containers.get(self._containers[0].attrs["Id"])
+ port = container.attrs["NetworkSettings"]["Ports"][
+ "%d/tcp" % self._port][0]["HostPort"]
+ return (self._host, port)
+ def get_names(self) -> types.GeneratorType:
+ for container in self._containers:
+ yield
+ def run(self, **env) -> str:
+ env = ["%s=%s" % (key, value) for (key, value) in env.items()]
+ return
+ self._image,
+ runtime=self._runtime,
+ ports=self._ports,
+ remove=True,
+ environment=env,
+ **self._kwargs).decode("utf-8")
+ def _clean_containers(self):
+ """Kills all containers."""
+ for container in self._containers:
+ try:
+ container.kill()
+ except docker.errors.NotFound:
+ pass
+class MockContainer(Container):
+ """Mock of Container."""
+ def __init__(self, workload: str):
+ self._workload = workload
+ def __enter__(self):
+ return self
+ def run(self, **env):
+ # Lookup sample data if any exists for the workload module. We use a
+ # well-defined test locate and a well-defined sample function.
+ mod = pydoc.locate(workloads.__name__ + "." + self._workload)
+ if hasattr(mod, "sample"):
+ return mod.sample(**env)
+ return "" # No output.
+ def address(self) -> Tuple[str, int]:
+ return ("", 80)
+ def get_names(self) -> types.GeneratorType:
+ yield "mock"
+ @contextlib.contextmanager
+ def detach(self, **env):
+ yield self
diff --git a/benchmarks/harness/ b/benchmarks/harness/
new file mode 100644
index 000000000..66b719b63
--- /dev/null
+++ b/benchmarks/harness/
@@ -0,0 +1,224 @@
+# python3
+# Copyright 2019 Google LLC
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""Machine abstraction passed to benchmarks to run docker containers.
+Abstraction for interacting with test machines. Machines are produced
+by Machine producers and represent a local or remote machine. Benchmark
+methods in /benchmarks/suite are passed the required number of machines in order
+to run the benchmark. Machines contain methods to run commands via bash,
+possibly over ssh. Machines also hold a connection to the docker UNIX socket
+to run contianers.
+ Typical usage example:
+ machine = Machine()
+ machine.pull(path)
+ container = machine.container()
+import logging
+import re
+import subprocess
+import time
+from typing import Tuple
+import docker
+from benchmarks import harness
+from benchmarks.harness import container
+from benchmarks.harness import machine_mocks
+from benchmarks.harness import ssh_connection
+from benchmarks.harness import tunnel_dispatcher
+class Machine(object):
+ """The machine object is the primary object for benchmarks.
+ Machine objects are passed to each metric function call and benchmarks use
+ machines to access real connections to those machines.
+ Attributes:
+ _name: Name as a string
+ """
+ _name = ""
+ def run(self, cmd: str) -> Tuple[str, str]:
+ """Convenience method for running a bash command on a machine object.
+ Some machines may point to the local machine, and thus, do not have ssh
+ connections. Run runs a command either local or over ssh and returns the
+ output stdout and stderr as strings.
+ Args:
+ cmd: The command to run as a string.
+ Returns:
+ The command output.
+ """
+ raise NotImplementedError
+ def read(self, path: str) -> str:
+ """Reads the contents of some file.
+ This will be mocked.
+ Args:
+ path: The path to the file to be read.
+ Returns:
+ The file contents.
+ """
+ raise NotImplementedError
+ def pull(self, workload: str) -> str:
+ """Send the given workload to the machine, build and tag it.
+ All images must be defined by the workloads directory.
+ Args:
+ workload: The workload name.
+ Returns:
+ The workload tag.
+ """
+ raise NotImplementedError
+ def container(self, image: str, **kwargs) -> container.Container:
+ """Returns a container object.
+ Args:
+ image: The pulled image tag.
+ **kwargs: Additional container options.
+ Returns:
+ :return: a container.Container object.
+ """
+ raise NotImplementedError
+ def sleep(self, amount: float):
+ """Sleeps the given amount of time."""
+ time.sleep(amount)
+ def __str__(self):
+ return self._name
+class MockMachine(Machine):
+ """A mocked machine."""
+ _name = "mock"
+ def run(self, cmd: str) -> Tuple[str, str]:
+ return "", ""
+ def read(self, path: str) -> str:
+ return machine_mocks.Readfile(path)
+ def pull(self, workload: str) -> str:
+ return workload # Workload is the tag.
+ def container(self, image: str, **kwargs) -> container.Container:
+ return container.MockContainer(image)
+ def sleep(self, amount: float):
+ pass
+def get_address(machine: Machine) -> str:
+ """Return a machine's default address."""
+ default_route, _ ="ip route get")
+ return" src ([0-9.]+) ", default_route).group(1)
+class LocalMachine(Machine):
+ """The local machine.
+ Attributes:
+ _name: Name as a string
+ _docker_client: a pythonic connection to to the local dockerd unix socket.
+ See:
+ """
+ def __init__(self, name):
+ self._name = name
+ self._docker_client = docker.from_env()
+ def run(self, cmd: str) -> Tuple[str, str]:
+ process = subprocess.Popen(
+ cmd.split(" "), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+ stdout, stderr = process.communicate()
+ return stdout.decode("utf-8"), stderr.decode("utf-8")
+ def read(self, path: str) -> str:
+ # Read the exact path locally.
+ return open(path, "r").read()
+ def pull(self, workload: str) -> str:
+ # Run the docker build command locally.
+"Building %s@%s locally...", workload, self._name)
+"docker build --tag={} {}".format(
+ workload, harness.LOCAL_WORKLOADS_PATH.format(workload)))
+ return workload # Workload is the tag.
+ def container(self, image: str, **kwargs) -> container.Container:
+ # Return a local docker container directly.
+ return container.DockerContainer(self._docker_client, get_address(self),
+ image, **kwargs)
+ def sleep(self, amount: float):
+ time.sleep(amount)
+class RemoteMachine(Machine):
+ """Remote machine accessible via an SSH connection.
+ Attributes:
+ _name: Name as a string
+ _ssh_connection: a paramiko backed ssh connection which can be used to run
+ commands on this machine
+ _tunnel: a python wrapper around a port forwarded ssh connection between a
+ local unix socket and the remote machine's dockerd unix socket.
+ _docker_client: a pythonic wrapper backed by the _tunnel. Allows sending
+ docker commands: see
+ """
+ def __init__(self, name, **kwargs):
+ self._name = name
+ self._ssh_connection = ssh_connection.SSHConnection(name, **kwargs)
+ self._tunnel = tunnel_dispatcher.Tunnel(name, **kwargs)
+ self._tunnel.connect()
+ self._docker_client = self._tunnel.get_docker_client()
+ def run(self, cmd: str) -> Tuple[str, str]:
+ return
+ def read(self, path: str) -> str:
+ # Just cat remotely.
+ stdout, stderr ="cat '{}'".format(path))
+ return stdout + stderr
+ def pull(self, workload: str) -> str:
+ # Push to the remote machine and build.
+"Building %s@%s remotely...", workload, self._name)
+ remote_path = self._ssh_connection.send_workload(workload)
+"docker build --tag={} {}".format(workload, remote_path))
+ return workload # Workload is the tag.
+ def container(self, image: str, **kwargs) -> container.Container:
+ # Return a remote docker container.
+ return container.DockerContainer(self._docker_client, get_address(self),
+ image, **kwargs)
+ def sleep(self, amount: float):
+ time.sleep(amount)
diff --git a/benchmarks/harness/machine_mocks/BUILD b/benchmarks/harness/machine_mocks/BUILD
new file mode 100644
index 000000000..c8ec4bc79
--- /dev/null
+++ b/benchmarks/harness/machine_mocks/BUILD
@@ -0,0 +1,9 @@
+ default_visibility = ["//benchmarks:__subpackages__"],
+ licenses = ["notice"],
+ name = "machine_mocks",
+ srcs = [""],
diff --git a/benchmarks/harness/machine_mocks/ b/benchmarks/harness/machine_mocks/
new file mode 100644
index 000000000..00f0085d7
--- /dev/null
+++ b/benchmarks/harness/machine_mocks/
@@ -0,0 +1,81 @@
+# python3
+# Copyright 2019 Google LLC
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""Machine mock files."""
+MEMINFO = """\
+MemTotal: 7652344 kB
+MemFree: 7174724 kB
+MemAvailable: 7152008 kB
+Buffers: 7544 kB
+Cached: 178856 kB
+SwapCached: 0 kB
+Active: 270928 kB
+Inactive: 68436 kB
+Active(anon): 153124 kB
+Inactive(anon): 880 kB
+Active(file): 117804 kB
+Inactive(file): 67556 kB
+Unevictable: 0 kB
+Mlocked: 0 kB
+SwapTotal: 0 kB
+SwapFree: 0 kB
+Dirty: 900 kB
+Writeback: 0 kB
+AnonPages: 153000 kB
+Mapped: 129120 kB
+Shmem: 1044 kB
+Slab: 60864 kB
+SReclaimable: 22792 kB
+SUnreclaim: 38072 kB
+KernelStack: 2672 kB
+PageTables: 5756 kB
+NFS_Unstable: 0 kB
+Bounce: 0 kB
+WritebackTmp: 0 kB
+CommitLimit: 3826172 kB
+Committed_AS: 663836 kB
+VmallocTotal: 34359738367 kB
+VmallocUsed: 0 kB
+VmallocChunk: 0 kB
+HardwareCorrupted: 0 kB
+AnonHugePages: 0 kB
+ShmemHugePages: 0 kB
+ShmemPmdMapped: 0 kB
+CmaTotal: 0 kB
+CmaFree: 0 kB
+HugePages_Total: 0
+HugePages_Free: 0
+HugePages_Rsvd: 0
+HugePages_Surp: 0
+Hugepagesize: 2048 kB
+DirectMap4k: 94196 kB
+DirectMap2M: 4624384 kB
+DirectMap1G: 3145728 kB
+ "/proc/meminfo": MEMINFO,
+def Readfile(path: str) -> str:
+ """Reads a mock file.
+ Args:
+ path: The target path.
+ Returns:
+ Mocked file contents or None.
+ """
+ return CONTENTS.get(path, None)
diff --git a/benchmarks/harness/machine_producers/BUILD b/benchmarks/harness/machine_producers/BUILD
new file mode 100644
index 000000000..a48da02a1
--- /dev/null
+++ b/benchmarks/harness/machine_producers/BUILD
@@ -0,0 +1,40 @@
+load("//benchmarks:defs.bzl", "py_library", "requirement")
+ default_visibility = ["//benchmarks:__subpackages__"],
+ licenses = ["notice"],
+ name = "harness",
+ srcs = [""],
+ name = "machine_producer",
+ srcs = [""],
+ name = "mock_producer",
+ srcs = [""],
+ deps = [
+ "//benchmarks/harness:machine",
+ "//benchmarks/harness/machine_producers:machine_producer",
+ ],
+ name = "yaml_producer",
+ srcs = [""],
+ deps = [
+ "//benchmarks/harness:machine",
+ "//benchmarks/harness/machine_producers:machine_producer",
+ requirement("PyYAML", False),
+ ],
+ name = "gcloud_mock_recorder",
+ srcs = [""],
diff --git a/benchmarks/harness/machine_producers/ b/benchmarks/harness/machine_producers/
new file mode 100644
index 000000000..634ef4843
--- /dev/null
+++ b/benchmarks/harness/machine_producers/
@@ -0,0 +1,13 @@
+# python3
+# Copyright 2019 Google LLC
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# See the License for the specific language governing permissions and
+# limitations under the License.
diff --git a/benchmarks/harness/machine_producers/ b/benchmarks/harness/machine_producers/
new file mode 100644
index 000000000..fd9837a37
--- /dev/null
+++ b/benchmarks/harness/machine_producers/
@@ -0,0 +1,97 @@
+# python3
+# Copyright 2019 Google LLC
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""A recorder and replay for testing the GCloudProducer.
+MockPrinter and MockReader handle printing and reading mock data for the
+purposes of testing. MockPrinter is passed to GCloudProducer objects. The user
+can then run scenarios and record them for playback in tests later.
+MockReader is passed to MockGcloudProducer objects and handles reading the
+previously recorded mock data.
+It is left to the user to check if data printed is properly redacted for their
+own use. The intended usecase for this class is data coming from gcloud
+commands, which will contain public IPs and other instance data.
+The data format is json and printed/read from the ./test_data directory. The
+data is the output of subprocess.CompletedProcess objects in json format.
+ Typical usage example:
+ recorder = MockPrinter()
+ producer = GCloudProducer(args, recorder)
+ machines = producer.get_machines(1)
+ with open("my_file.json") as fd:
+ recorder.write_out(fd)
+ reader = MockReader(filename)
+ producer = MockGcloudProducer(args, mock)
+ machines = producer.get_machines(1)
+ assert len(machines) == 1
+import io
+import json
+import subprocess
+class MockPrinter(object):
+ """Handles printing Mock data for MockGcloudProducer.
+ Attributes:
+ _records: list of json object records for printing
+ """
+ def __init__(self):
+ self._records = []
+ def record(self, entry: subprocess.CompletedProcess):
+ """Records data and strips out ip addresses."""
+ record = {
+ "args": entry.args,
+ "stdout": entry.stdout.decode("utf-8"),
+ "returncode": str(entry.returncode)
+ }
+ self._records.append(record)
+ def write_out(self, fd: io.FileIO):
+ """Prints out the data into the given filepath."""
+ fd.write(json.dumps(self._records, indent=4))
+class MockReader(object):
+ """Handles reading Mock data for MockGcloudProducer.
+ Attributes:
+ _records: List[json] records read from the passed in file.
+ """
+ def __init__(self, filepath: str):
+ with open(filepath, "rb") as file:
+ self._records = json.loads(
+ self._i = 0
+ def __iter__(self):
+ return self
+ def __next__(self, args) -> subprocess.CompletedProcess:
+ """Returns the next record as a CompletedProcess."""
+ if self._i < len(self._records):
+ record = self._records[self._i]
+ stdout = record["stdout"].encode("ascii")
+ returncode = int(record["returncode"])
+ return subprocess.CompletedProcess(
+ args=args, returncode=returncode, stdout=stdout, stderr=b"")
+ raise StopIteration()
diff --git a/benchmarks/harness/machine_producers/ b/benchmarks/harness/machine_producers/
new file mode 100644
index 000000000..124ee14cc
--- /dev/null
+++ b/benchmarks/harness/machine_producers/
@@ -0,0 +1,30 @@
+# python3
+# Copyright 2019 Google LLC
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""Abstract types."""
+from typing import List
+from benchmarks.harness import machine
+class MachineProducer:
+ """Abstract Machine producer."""
+ def get_machines(self, num_machines: int) -> List[machine.Machine]:
+ """Returns the requested number of machines."""
+ raise NotImplementedError
+ def release_machines(self, machine_list: List[machine.Machine]):
+ """Releases the given set of machines."""
+ raise NotImplementedError
diff --git a/benchmarks/harness/machine_producers/ b/benchmarks/harness/machine_producers/
new file mode 100644
index 000000000..4f29ad53f
--- /dev/null
+++ b/benchmarks/harness/machine_producers/
@@ -0,0 +1,31 @@
+# python3
+# Copyright 2019 Google LLC
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""Producers of mocks."""
+from typing import List
+from benchmarks.harness import machine
+from benchmarks.harness.machine_producers import machine_producer
+class MockMachineProducer(machine_producer.MachineProducer):
+ """Produces MockMachine objects."""
+ def get_machines(self, num_machines: int) -> List[machine.MockMachine]:
+ """Returns the request number of MockMachines."""
+ return [machine.MockMachine() for i in range(num_machines)]
+ def release_machines(self, machine_list: List[machine.MockMachine]):
+ """No-op."""
+ return
diff --git a/benchmarks/harness/machine_producers/ b/benchmarks/harness/machine_producers/
new file mode 100644
index 000000000..5d334e480
--- /dev/null
+++ b/benchmarks/harness/machine_producers/
@@ -0,0 +1,106 @@
+# python3
+# Copyright 2019 Google LLC
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""Producers based on yaml files."""
+import os
+import threading
+from typing import Dict
+from typing import List
+import yaml
+from benchmarks.harness import machine
+from benchmarks.harness.machine_producers import machine_producer
+class YamlMachineProducer(machine_producer.MachineProducer):
+ """Loads machines from a yaml file."""
+ def __init__(self, path: str):
+ self.machines = build_machines(path)
+ self.max_machines = len(self.machines)
+ self.machine_condition = threading.Condition()
+ def get_machines(self, num_machines: int) -> List[machine.Machine]:
+ if num_machines > self.max_machines:
+ raise ValueError(
+ "Insufficient Ammount of Machines. {ask} asked for and have {max_num} max."
+ .format(ask=num_machines, max_num=self.max_machines))
+ with self.machine_condition:
+ while not self._enough_machines(num_machines):
+ self.machine_condition.wait(timeout=1)
+ return [self.machines.pop(0) for _ in range(num_machines)]
+ def release_machines(self, machine_list: List[machine.Machine]):
+ with self.machine_condition:
+ while machine_list:
+ next_machine = machine_list.pop()
+ self.machines.append(next_machine)
+ self.machine_condition.notify()
+ def _enough_machines(self, ask: int):
+ return ask <= len(self.machines)
+def build_machines(path: str, num_machines: str = -1) -> List[machine.Machine]:
+ """Builds machine objects defined by the yaml file "path".
+ Args:
+ path: The path to a yaml file which defines machines.
+ num_machines: Optional limit on how many machine objects to build.
+ Returns:
+ Machine objects in a list.
+ If num_machines is set, len(machines) <= num_machines.
+ """
+ data = parse_yaml(path)
+ machines = []
+ for key, value in data.items():
+ if len(machines) == num_machines:
+ return machines
+ if isinstance(value, dict):
+ machines.append(machine.RemoteMachine(key, **value))
+ else:
+ machines.append(machine.LocalMachine(key))
+ return machines
+def parse_yaml(path: str) -> Dict[str, Dict[str, str]]:
+ """Parse the yaml file pointed by path.
+ Args:
+ path: The path to yaml file.
+ Returns:
+ The contents of the yaml file as a dictionary.
+ """
+ data = get_file_contents(path)
+ return yaml.load(data, Loader=yaml.Loader)
+def get_file_contents(path: str) -> str:
+ """Dumps the file contents to a string and returns them.
+ Args:
+ path: The path to dump.
+ Returns:
+ The file contents as a string.
+ """
+ if not os.path.isabs(path):
+ path = os.path.abspath(path)
+ with open(path) as input_file:
+ return
diff --git a/benchmarks/harness/ b/benchmarks/harness/
new file mode 100644
index 000000000..fcbfbcdb2
--- /dev/null
+++ b/benchmarks/harness/
@@ -0,0 +1,111 @@
+# python3
+# Copyright 2019 Google LLC
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""SSHConnection handles the details of SSH connections."""
+import os
+import warnings
+import paramiko
+from benchmarks import harness
+# Get rid of paramiko Cryptography Warnings.
+warnings.filterwarnings(action="ignore", module=".*paramiko.*")
+def send_one_file(client: paramiko.SSHClient, path: str, remote_dir: str):
+ """Sends a single file via an SSH client.
+ Args:
+ client: The existing SSH client.
+ path: The local path.
+ remote_dir: The remote directory.
+ """
+ filename = path.split("/").pop()
+ client.exec_command("mkdir -p " + remote_dir)
+ with client.open_sftp() as ftp_client:
+ ftp_client.put(path, os.path.join(remote_dir, filename))
+class SSHConnection:
+ """SSH connection to a remote machine."""
+ def __init__(self, name: str, hostname: str, key_path: str, username: str,
+ **kwargs):
+ """Sets up a paramiko ssh connection to the given hostname."""
+ self._name = name # Unused.
+ self._hostname = hostname
+ self._username = username
+ self._key_path = key_path # RSA Key path
+ self._kwargs = kwargs
+ # SSHConnection wraps paramiko. paramiko supports RSA, ECDSA, and Ed25519
+ # keys, and we've chosen to only suport and require RSA keys. paramiko
+ # supports RSA keys that begin with '----BEGIN RSAKEY----'.
+ #
+ self.rsa_key = self._rsa()
+"true") # Validate.
+ def _client(self) -> paramiko.SSHClient:
+ """Returns a connected SSH client."""
+ client = paramiko.SSHClient()
+ client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
+ client.connect(
+ hostname=self._hostname,
+ port=22,
+ username=self._username,
+ pkey=self.rsa_key,
+ allow_agent=False,
+ look_for_keys=False)
+ return client
+ def _rsa(self):
+ if "key_password" in self._kwargs:
+ password = self._kwargs["key_password"]
+ else:
+ password = None
+ rsa = paramiko.RSAKey.from_private_key_file(self._key_path, password)
+ return rsa
+ def run(self, cmd: str) -> (str, str):
+ """Runs a command via ssh.
+ Args:
+ cmd: The shell command to run.
+ Returns:
+ The contents of stdout and stderr.
+ """
+ with self._client() as client:
+ _, stdout, stderr = client.exec_command(command=cmd)
+ stdout ="utf-8")
+ stderr ="utf-8")
+ return stdout, stderr
+ def send_workload(self, name: str) -> str:
+ """Sends a workload to the remote machine.
+ Args:
+ name: The workload name.
+ Returns:
+ The remote path.
+ """
+ with self._client() as client:
+ for dirpath, _, filenames in os.walk(
+ harness.LOCAL_WORKLOADS_PATH.format(name)):
+ for filename in filenames:
+ send_one_file(client, os.path.join(dirpath, filename),
+ harness.REMOTE_WORKLOADS_PATH.format(name))
+ return harness.REMOTE_WORKLOADS_PATH.format(name)
diff --git a/benchmarks/harness/ b/benchmarks/harness/
new file mode 100644
index 000000000..c56fd022a
--- /dev/null
+++ b/benchmarks/harness/
@@ -0,0 +1,122 @@
+# python3
+# Copyright 2019 Google LLC
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""Tunnel handles setting up connections to remote machines.
+Tunnel dispatcher is a wrapper around the connection from a local UNIX socket
+and a remote UNIX socket via SSH with port forwarding. This is done to
+initialize the pythonic dockerpy client to run containers on the remote host by
+connecting to /var/run/docker.sock (where Docker is listening). Tunnel
+dispatcher sets up the local UNIX socket and calls the `ssh` command as a
+subprocess, and holds a reference to that subprocess. It manages clean-up on
+exit as best it can by killing the ssh subprocess and deleting the local UNIX
+socket,stored in /tmp for easy cleanup in most systems if this fails.
+ Typical usage example:
+ t = Tunnel(name, **kwargs)
+ t.connect()
+ client = t.get_docker_client() #
+"ubuntu", "echo hello world")
+import os
+import tempfile
+import time
+import docker
+import pexpect
+ -o GlobalKnownHostsFile=/dev/null
+ -o UserKnownHostsFile=/dev/null
+ -o StrictHostKeyChecking=no
+ -o IdentitiesOnly=yes
+ -nNT -L {filename}:/var/run/docker.sock
+ -i {key_path}
+ {username}@{hostname}"""
+class Tunnel(object):
+ """The tunnel object represents the tunnel via ssh.
+ This connects a local unix domain socket with a remote socket.
+ Attributes:
+ _filename: a temporary name of the UNIX socket prefixed by the name
+ argument.
+ _hostname: the IP or resolvable hostname of the remote host.
+ _username: the username of the ssh_key used to run ssh.
+ _key_path: path to a valid key.
+ _key_password: optional password to the ssh key in _key_path
+ _process: holds reference to the ssh subprocess created.
+ Returns:
+ The new minimum port.
+ Raises:
+ ConnectionError: If no available port is found.
+ """
+ def __init__(self,
+ name: str,
+ hostname: str,
+ username: str,
+ key_path: str,
+ key_password: str = "",
+ **kwargs):
+ self._filename = tempfile.NamedTemporaryFile(prefix=name).name
+ self._hostname = hostname
+ self._username = username
+ self._key_path = key_path
+ self._key_password = key_password
+ self._kwargs = kwargs
+ self._process = None
+ def connect(self):
+ """Connects the SSH tunnel and stores the subprocess reference in _process."""
+ cmd = SSH_TUNNEL_COMMAND.format(
+ filename=self._filename,
+ key_path=self._key_path,
+ username=self._username,
+ hostname=self._hostname)
+ self._process = pexpect.spawn(cmd, timeout=10)
+ # If given a password, assume we'll be asked for it.
+ if self._key_password:
+ self._process.expect(["Enter passphrase for key .*: "])
+ self._process.sendline(self._key_password)
+ while True:
+ # Wait for the tunnel to appear.
+ if self._process.exitstatus is not None:
+ raise ConnectionError("Error in setting up ssh tunnel")
+ if os.path.exists(self._filename):
+ return
+ time.sleep(0.1)
+ def path(self):
+ """Return the socket file."""
+ return self._filename
+ def get_docker_client(self):
+ """Returns a docker client for this Tunnel."""
+ return docker.DockerClient(base_url="unix:/" + self._filename)
+ def __del__(self):
+ """Closes the ssh connection process and deletes the socket file."""
+ if self._process:
+ self._process.close()
+ if os.path.exists(self._filename):
+ os.remove(self._filename)