diff options
Diffstat (limited to 'benchmarks/harness')
-rw-r--r-- | benchmarks/harness/__init__.py | 9 | ||||
-rw-r--r-- | benchmarks/harness/machine.py | 11 | ||||
-rw-r--r-- | benchmarks/harness/machine_producers/gcloud_producer.py | 70 | ||||
-rw-r--r-- | benchmarks/harness/machine_producers/machine_producer.py | 21 | ||||
-rw-r--r-- | benchmarks/harness/ssh_connection.py | 9 |
5 files changed, 84 insertions, 36 deletions
diff --git a/benchmarks/harness/__init__.py b/benchmarks/harness/__init__.py index a7f34da9e..61fd25f73 100644 --- a/benchmarks/harness/__init__.py +++ b/benchmarks/harness/__init__.py @@ -13,13 +13,20 @@ # limitations under the License. """Core benchmark utilities.""" +import getpass import os # LOCAL_WORKLOADS_PATH defines the path to use for local workloads. This is a # format string that accepts a single string parameter. LOCAL_WORKLOADS_PATH = os.path.join( - os.path.dirname(__file__), "../workloads/{}") + os.path.dirname(__file__), "../workloads/{}/tar.tar") # REMOTE_WORKLOADS_PATH defines the path to use for storing the workloads on the # remote host. This is a format string that accepts a single string parameter. REMOTE_WORKLOADS_PATH = "workloads/{}" + +# DEFAULT_USER is the default user running this script. +DEFAULT_USER = getpass.getuser() + +# DEFAULT_USER_HOME is the home directory of the user running the script. +DEFAULT_USER_HOME = os.environ["HOME"] if "HOME" in os.environ else "" diff --git a/benchmarks/harness/machine.py b/benchmarks/harness/machine.py index 66b719b63..2df4c9e31 100644 --- a/benchmarks/harness/machine.py +++ b/benchmarks/harness/machine.py @@ -160,15 +160,17 @@ class LocalMachine(Machine): stdout, stderr = process.communicate() return stdout.decode("utf-8"), stderr.decode("utf-8") - def read(self, path: str) -> str: + def read(self, path: str) -> bytes: # Read the exact path locally. return open(path, "r").read() def pull(self, workload: str) -> str: # Run the docker build command locally. logging.info("Building %s@%s locally...", workload, self._name) - self.run("docker build --tag={} {}".format( - workload, harness.LOCAL_WORKLOADS_PATH.format(workload))) + with open(harness.LOCAL_WORKLOADS_PATH.format(workload), + "rb") as dockerfile: + self._docker_client.images.build( + fileobj=dockerfile, tag=workload, custom_context=True) return workload # Workload is the tag. def container(self, image: str, **kwargs) -> container.Container: @@ -212,6 +214,9 @@ class RemoteMachine(Machine): # Push to the remote machine and build. logging.info("Building %s@%s remotely...", workload, self._name) remote_path = self._ssh_connection.send_workload(workload) + # Workloads are all tarballs. + self.run("tar -xvf {remote_path}/tar.tar -C {remote_path}".format( + remote_path=remote_path)) self.run("docker build --tag={} {}".format(workload, remote_path)) return workload # Workload is the tag. diff --git a/benchmarks/harness/machine_producers/gcloud_producer.py b/benchmarks/harness/machine_producers/gcloud_producer.py index 4693dd8a2..e0b77d52b 100644 --- a/benchmarks/harness/machine_producers/gcloud_producer.py +++ b/benchmarks/harness/machine_producers/gcloud_producer.py @@ -29,7 +29,6 @@ collisions with user instances shouldn't happen. producer.release_machines(NUM_MACHINES) """ import datetime -import getpass import json import subprocess import threading @@ -40,8 +39,6 @@ from benchmarks.harness import machine from benchmarks.harness.machine_producers import gcloud_mock_recorder from benchmarks.harness.machine_producers import machine_producer -DEFAULT_USER = getpass.getuser() - class GCloudProducer(machine_producer.MachineProducer): """Implementation of MachineProducer backed by GCP. @@ -50,9 +47,10 @@ class GCloudProducer(machine_producer.MachineProducer): Attributes: project: The GCP project name under which to create the machines. - ssh_key_path: path to a valid ssh key. See README on vaild ssh keys. + ssh_key_file: path to a valid ssh private key. See README on vaild ssh keys. image: image name as a string. image_project: image project as a string. + machine_type: type of GCP to create. e.g. n1-standard-4 zone: string to a valid GCP zone. ssh_user: string of user name for ssh_key ssh_password: string of password for ssh key @@ -63,18 +61,22 @@ class GCloudProducer(machine_producer.MachineProducer): def __init__(self, project: str, - ssh_key_path: str, + ssh_key_file: str, image: str, image_project: str, + machine_type: str, zone: str, ssh_user: str, + ssh_password: str, mock: gcloud_mock_recorder.MockPrinter = None): self.project = project - self.ssh_key_path = ssh_key_path + self.ssh_key_file = ssh_key_file self.image = image self.image_project = image_project + self.machine_type = machine_type self.zone = zone - self.ssh_user = ssh_user if ssh_user else DEFAULT_USER + self.ssh_user = ssh_user + self.ssh_password = ssh_password self.mock = mock self.condition = threading.Condition() @@ -86,20 +88,19 @@ class GCloudProducer(machine_producer.MachineProducer): with self.condition: names = self._get_unique_names(num_machines) self._build_instances(names) - instances = self._start_command(names) - self._add_ssh_key_to_instances(names) - return self._machines_from_instances(instances) + instances = self._start_command(names) + self._add_ssh_key_to_instances(names) + return self._machines_from_instances(instances) def release_machines(self, machine_list: List[machine.Machine]): """Releases the requested number of machines, deleting the instances.""" if not machine_list: return - with self.condition: - cmd = "gcloud compute instances delete --quiet".split(" ") - names = [str(m) for m in machine_list] - cmd.extend(names) - cmd.append("--zone={zone}".format(zone=self.zone)) - self._run_command(cmd) + cmd = "gcloud compute instances delete --quiet".split(" ") + names = [str(m) for m in machine_list] + cmd.extend(names) + cmd.append("--zone={zone}".format(zone=self.zone)) + self._run_command(cmd, detach=True) def _machines_from_instances( self, instances: List[Dict[str, Any]]) -> List[machine.Machine]: @@ -111,9 +112,11 @@ class GCloudProducer(machine_producer.MachineProducer): "hostname": instance["networkInterfaces"][0]["accessConfigs"][0]["natIP"], "key_path": - self.ssh_key_path, + self.ssh_key_file, "username": - self.ssh_user + self.ssh_user, + "key_password": + self.ssh_password } machines.append(machine.RemoteMachine(name=name, **kwargs)) return machines @@ -148,12 +151,15 @@ class GCloudProducer(machine_producer.MachineProducer): "_build_instances cannot create instances without names.") cmd = "gcloud compute instances create".split(" ") cmd.extend(names) - cmd.extend("--preemptible --image={image} --zone={zone}".format( - image=self.image, zone=self.zone).split(" ")) + cmd.extend( + "--preemptible --image={image} --zone={zone} --machine-type={machine_type}" + .format( + image=self.image, zone=self.zone, + machine_type=self.machine_type).split(" ")) if self.image_project: cmd.append("--image-project={project}".format(project=self.image_project)) - res = self._run_command(cmd) - return json.loads(res.stdout) + res = self._run_command(cmd) + return json.loads(res.stdout) def _start_command(self, names): """Starts instances using gcloud command. @@ -184,7 +190,7 @@ class GCloudProducer(machine_producer.MachineProducer): Args: names: list of machine names to which to add the ssh-key - self.ssh_key_path. + self.ssh_key_file. Raises: subprocess.CalledProcessError: when underlying subprocess call returns an @@ -193,7 +199,7 @@ class GCloudProducer(machine_producer.MachineProducer): """ for name in names: cmd = "gcloud compute ssh {name}".format(name=name).split(" ") - cmd.append("--ssh-key-file={key}".format(key=self.ssh_key_path)) + cmd.append("--ssh-key-file={key}".format(key=self.ssh_key_file)) cmd.append("--zone={zone}".format(zone=self.zone)) cmd.append("--command=uname") timeout = datetime.timedelta(seconds=5 * 60) @@ -221,7 +227,9 @@ class GCloudProducer(machine_producer.MachineProducer): res = self._run_command(cmd) return json.loads(res.stdout) - def _run_command(self, cmd: List[str]) -> subprocess.CompletedProcess: + def _run_command(self, + cmd: List[str], + detach: bool = False) -> [None, subprocess.CompletedProcess]: """Runs command as a subprocess. Runs command as subprocess and returns the result. @@ -230,14 +238,24 @@ class GCloudProducer(machine_producer.MachineProducer): Args: cmd: command to be run as a list of strings. + detach: if True, run the child process and don't wait for it to return. Returns: - Completed process object to be parsed by caller. + Completed process object to be parsed by caller or None if detach=True. Raises: CalledProcessError: if subprocess.run returns an error. """ cmd = cmd + ["--format=json"] + if detach: + p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + if self.mock: + out, _ = p.communicate() + self.mock.record( + subprocess.CompletedProcess( + returncode=p.returncode, stdout=out, args=p.args)) + return + res = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) if self.mock: self.mock.record(res) diff --git a/benchmarks/harness/machine_producers/machine_producer.py b/benchmarks/harness/machine_producers/machine_producer.py index 124ee14cc..f5591c026 100644 --- a/benchmarks/harness/machine_producers/machine_producer.py +++ b/benchmarks/harness/machine_producers/machine_producer.py @@ -13,6 +13,7 @@ # limitations under the License. """Abstract types.""" +import threading from typing import List from benchmarks.harness import machine @@ -28,3 +29,23 @@ class MachineProducer: def release_machines(self, machine_list: List[machine.Machine]): """Releases the given set of machines.""" raise NotImplementedError + + +class LocalMachineProducer(MachineProducer): + """Produces Local Machines.""" + + def __init__(self, limit: int): + self.limit_sem = threading.Semaphore(value=limit) + + def get_machines(self, num_machines: int) -> List[machine.Machine]: + """Returns the request number of MockMachines.""" + + self.limit_sem.acquire() + return [machine.LocalMachine("local") for _ in range(num_machines)] + + def release_machines(self, machine_list: List[machine.MockMachine]): + """No-op.""" + if not machine_list: + raise ValueError("Cannot release an empty list!") + self.limit_sem.release() + machine_list.clear() diff --git a/benchmarks/harness/ssh_connection.py b/benchmarks/harness/ssh_connection.py index fcbfbcdb2..e0bf258f1 100644 --- a/benchmarks/harness/ssh_connection.py +++ b/benchmarks/harness/ssh_connection.py @@ -94,7 +94,7 @@ class SSHConnection: return stdout, stderr def send_workload(self, name: str) -> str: - """Sends a workload to the remote machine. + """Sends a workload tarball to the remote machine. Args: name: The workload name. @@ -103,9 +103,6 @@ class SSHConnection: The remote path. """ with self._client() as client: - for dirpath, _, filenames in os.walk( - harness.LOCAL_WORKLOADS_PATH.format(name)): - for filename in filenames: - send_one_file(client, os.path.join(dirpath, filename), - harness.REMOTE_WORKLOADS_PATH.format(name)) + send_one_file(client, harness.LOCAL_WORKLOADS_PATH.format(name), + harness.REMOTE_WORKLOADS_PATH.format(name)) return harness.REMOTE_WORKLOADS_PATH.format(name) |