diff --git a/runners/cromwell_on_google/README.md b/runners/cromwell_on_google/README.md new file mode 100644 index 0000000..586283a --- /dev/null +++ b/runners/cromwell_on_google/README.md @@ -0,0 +1,272 @@ +# Run a WDL workflow + +## Overview + +This example demonstrates running a multi-stage workflow on +Google Cloud Platform. + +* The workflow is launched with the Google Genomics [Pipelines API](https://cloud.google.com/genomics/v1alpha2/pipelines). +* The workflow is defined using the Broad Institute's +[Workflow Definition Language](https://software.broadinstitute.org/wdl/) (WDL). +* The workflow stages are orchestrated by the Broad Institute's +[Cromwell](https://github.com/broadinstitute/cromwell). + +When submitted using the Pipelines API, the workflow runs +on multiple [Google Compute Engine](https://cloud.google.com/compute/) +virtual machines. +First a master node is created for Cromwell, and then Cromwell submits +each stage of the workflow as one or more separate pipelines. + +Execution of a running Pipeline proceeds as: + +1. Create Compute Engine virtual machine + +2. On the VM, in a Docker container, execute wdl_runner.py + + a. Run Cromwell (server) + + b. Submit workflow, inputs, and options to Cromwell server + + c. Poll for completion as Cromwell executes: + + 1) Call pipelines.run() to execute call 1 + 2) Poll for completion of call 1 + 3) Call pipelines.run() to execute call 2 + 4) Poll for completion of call 2 + + + d. Copy workflow metadata to output path + + e. Copy workflow outputs to output path + +3. Destroy Compute Engine Virtual machine + +## Setup Overview + +Code packaging for the Pipelines API is done through +[Docker](https://www.docker.com/) images. The instructions provided +here explain how to create your own Docker image, although a copy +of this Docker image has already been built and made available by +the Broad Institute. + +### Code summary + +The code in the wdl_runner Docker image includes: + +* [OpenJDK 8](http://openjdk.java.net/projects/jdk8/) runtime engine (JRE) +* [Python 2.7](https://www.python.org/download/releases/2.7/) interpreter +* [Cromwell release 27](https://github.com/broadinstitute/cromwell/releases/tag/27) +* [Python and shell scripts from this repository](./src) + +Take a look at the [Dockerfile](./cromwell_launcher/Dockerfile) for full details. + +## (0) Prerequisites + +1. Clone or fork this repository. + +2. Enable the Genomics, Cloud Storage, and Compute Engine APIs on a new + or existing Google Cloud Project using the [Cloud Console](https://console.cloud.google.com/flows/enableapi?apiid=genomics,storage_component,compute_component&redirect=https://console.cloud.google.com) + +3. Follow the Google Genomics [getting started instructions](https://cloud.google.com/genomics/install-genomics-tools#install-genomics-tools) to install and authorize the Google Cloud SDK. + +4. Follow the Cloud Storage instructions for [Creating Storage Buckets](https://cloud.google.com/storage/docs/creating-buckets) to create a bucket for workflow output and logging + +5. If you plan to create your own Docker images, then +[install docker](https://docs.docker.com/engine/installation/#installation) + +## (1) Create and stage the wdl_runner Docker image + +*If you are going to use the published version of the docker image, +then skip this step.* + +Every Google Cloud project provides a private repository for saving and +serving Docker images called the [Google Container Registry](https://cloud.google.com/container-registry/docs/). + +The following instructions allow you to stage a Docker image in your project's +Container Registry with all necessary code for orchestrating your workflow. + +### (1a) Create the Docker image. + +``` +git clone https://github.com/googlegenomics/pipelines-api-examples.git +cd pipelines-api-examples/wdl_runner/ +docker build -t ${USER}/wdl_runner ./cromwell_launcher +``` + +### (1b) Push the Docker image to a repository. + +In this example, we push the container to +[Google Container Registry](https://cloud.google.com/container-registry/) +via the following commands: + +``` +docker tag ${USER}/wdl_runner gcr.io/YOUR-PROJECT-ID/wdl_runner +gcloud docker -- push gcr.io/YOUR-PROJECT-ID/wdl_runner +``` + +* Replace `YOUR-PROJECT-ID` with your project ID. + +## (2) Run the sample workflow in the cloud + +The file [./workflows/wdl_pipeline.yaml](./workflows/wdl_pipeline.yaml) +defines a pipeline for running WDL workflows. By default, it uses the +docker image built by the Broad Institute from this repository: + +``` +docker: + imageName: gcr.io/broad-dsde-outreach/wdl_runner: +``` + +If you have built your own Docker image, then change the imageName: + +``` +docker: + imageName: gcr.io/YOUR-PROJECT-ID/wdl_runner +``` + +* Replace `YOUR-PROJECT-ID` with your project ID. + +#### Run the following command: + +``` +gcloud \ + alpha genomics pipelines run \ + --pipeline-file workflows/wdl_pipeline.yaml \ + --zones us-central1-f \ + --logging gs://YOUR-BUCKET/pipelines-api-examples/wdl_runner/logging \ + --inputs-from-file WDL=workflows/vcf_chr_count/vcf_chr_count.wdl \ + --inputs-from-file WORKFLOW_INPUTS=workflows/vcf_chr_count/vcf_chr_count.sample.inputs.json \ + --inputs-from-file WORKFLOW_OPTIONS=workflows/common/basic.jes.us.options.json \ + --inputs WORKSPACE=gs://YOUR-BUCKET/pipelines-api-examples/wdl_runner/workspace \ + --inputs OUTPUTS=gs://YOUR-BUCKET/pipelines-api-examples/wdl_runner/output +``` + +* Replace `YOUR-BUCKET` with a bucket in your project. + +The output will be an operation ID for the Pipeline. + +## (3) Monitor the pipeline operation + +This github repo includes a shell script, +[./tools/monitor_wdl_pipelines.sh](./tools/monitor_wdl_pipelines.sh), +for monitoring the status of a pipeline launched using ``wdl_pipeline.yaml``. + +``` +$ ./tools/monitor_wdl_pipeline.sh YOUR-NEW-OPERATION-ID +Logging: gs://YOUR-BUCKET/pipelines-api-examples/wdl_runner/logging +Workspace: gs://YOUR-BUCKET/pipelines-api-examples/wdl_runner/workspace +Outputs: gs://YOUR-BUCKET/pipelines-api-examples/wdl_runner/output + +2016-09-01 09:37:44: operation not complete +No operations logs found. +There are 0 output files +Sleeping 60 seconds + +... + +2016-09-01 09:40:53: operation not complete +Calls started but not complete: + call-vcf_split +Sleeping 60 seconds + +... + +2016-09-01 09:44:02: operation not complete +Operation logs found: + YOUR-NEW-OPERATION-ID.log + YOUR-NEW-OPERATION-ID.log + YOUR-NEW-OPERATION-ID +Calls (including shards) completed: 1 +Calls started but not complete: + call-vcf_record_count/shard-0 + call-vcf_record_count/shard-1 + call-vcf_record_count/shard-2 +Sleeping 60 seconds + +... + +2016-09-01 09:54:31: operation not complete +Calls (including shards) completed: 4 +No calls currently in progress. + (Transitioning to next stage or copying final output). +Sleeping 60 seconds + +2016-09-01 09:55:34: operation not complete +Calls (including shards) completed: 4 +Calls started but not complete: + call-gather +Sleeping 60 seconds + +2016-09-01 09:56:37: operation not complete +Calls (including shards) completed: 5 +No calls currently in progress. + (Transitioning to next stage or copying final output). +There are 1 output files +Sleeping 60 seconds + +2016-09-01 09:57:40: operation complete +Completed operation status information + done: true + metadata: + events: + - description: start + startTime: '2016-09-01T16:38:18.215458712Z' + - description: pulling-image + startTime: '2016-09-01T16:38:18.215809129Z' + - description: localizing-files + startTime: '2016-09-01T16:38:42.613937060Z' + - description: running-docker + startTime: '2016-09-01T16:38:42.613978300Z' + - description: delocalizing-files + startTime: '2016-09-01T16:56:42.144127783Z' + - description: ok + startTime: '2016-09-01T16:56:43.725128719Z' + name: operations/YOUR-NEW-OPERATION-ID + gs://YOUR-BUCKET/pipelines-api-examples/wdl_runner/output/output.txt + gs://YOUR-BUCKET/pipelines-api-examples/wdl_runner/output/wdl_run_metadata.json + +Preemptions: + None +``` + +## (4) Check the results + +Check the operation output for a top-level `errors` field. +If none, then the operation should have finished successfully. + +## (5) Check that the output exists + +``` +$ gsutil ls -l gs://YOUR-BUCKET/pipelines-api-examples/wdl_runner/output +TOTAL: 2 objects, 13025 bytes (12.72 KiB) + 46 2016-09-01T16:56:40Z gs://YOUR-BUCKET/pipelines-api-examples/wdl_runner/output/output.txt + 15069 2016-09-01T16:56:37Z gs://YOUR-BUCKET/pipelines-api-examples/wdl_runner/output/wdl_run_metadata.json +TOTAL: 2 objects, 15115 bytes (14.76 KiB) +``` + +* Replace `YOUR-BUCKET` with a bucket in your project. + +## (6) Check the output + +``` +$ gsutil cat gs://YOUR-BUCKET/pipelines-api-examples/wdl_runner/output/output.txt +chrM.vcf 197 +chrX.vcf 4598814 +chrY.vcf 653100 +``` + +* Replace `YOUR-BUCKET` with a bucket in your project. + +## (7) Clean up the intermediate workspace files + +When Cromwell runs, per-stage output and other intermediate files are +written to the WORKSPACE path you specified in the `gcloud` command above. + +To remove these files, run: + +``` +gsutil -m rm gs://YOUR-BUCKET/pipelines-api-examples/wdl_runner/workspace/** +``` + +* Replace `YOUR-BUCKET` with a bucket in your project. + diff --git a/runners/cromwell_on_google/cromwell_launcher/Dockerfile b/runners/cromwell_on_google/cromwell_launcher/Dockerfile new file mode 100644 index 0000000..b4ce193 --- /dev/null +++ b/runners/cromwell_on_google/cromwell_launcher/Dockerfile @@ -0,0 +1,59 @@ +# Copyright 2017 Google Inc. +# +# Use of this source code is governed by a BSD-style +# license that can be found in the LICENSE file or at +# https://developers.google.com/open-source/licenses/bsd + +FROM java:openjdk-8-jre + +# Install python +RUN apt-get update && \ + apt-get install python --yes && \ + apt-get clean && \ + rm -rf /var/lib/apt/lists/* + +# Install gcloud +# See https://cloud.google.com/sdk/ +RUN curl https://sdk.cloud.google.com | bash + +# Add the install location explicitly to the path (for non-interactive shells) +ENV PATH /root/google-cloud-sdk/bin:$PATH + +# Install pip for the next two steps... +RUN apt-get update && \ + apt-get install python-pip --yes + +# Install Python "requests" (for cromwell_driver.py) package +RUN pip install requests simplejson + +# Install Google Python client (for file_util.py) package +RUN pip install --upgrade google-api-python-client + +# Remove pip +RUN apt-get remove --yes python-pip && \ + apt-get clean && \ + rm -rf /var/lib/apt/lists/* + +# Copy the wdl_runner python, shell script, and dependencies +RUN mkdir /wdl_runner +COPY cromwell_driver.py \ + file_util.py \ + sys_util.py \ + wdl_outputs_util.py \ + wdl_runner.py \ + wdl_runner.sh \ + /wdl_runner/ +RUN chmod u+x /wdl_runner/wdl_runner.sh + +# Copy Cromwell and the Cromwell conf template +RUN mkdir /cromwell +RUN cd /cromwell && \ + curl -L -O https://github.com/broadinstitute/cromwell/releases/download/27/cromwell-27.jar +RUN ln /cromwell/cromwell-27.jar /cromwell/cromwell.jar +COPY jes_template.conf /cromwell/ + +# Set up the runtime environment +ENV CROMWELL /cromwell/cromwell.jar +ENV CROMWELL_CONF /cromwell/jes_template.conf + +WORKDIR /wdl_runner diff --git a/runners/cromwell_on_google/cromwell_launcher/cromwell_driver.py b/runners/cromwell_on_google/cromwell_launcher/cromwell_driver.py new file mode 100644 index 0000000..d2604d4 --- /dev/null +++ b/runners/cromwell_on_google/cromwell_launcher/cromwell_driver.py @@ -0,0 +1,155 @@ +#!/usr/bin/python + +# Copyright 2017 Google Inc. +# +# Use of this source code is governed by a BSD-style +# license that can be found in the LICENSE file or at +# https://developers.google.com/open-source/licenses/bsd + +# cromwell_driver.py +# +# This script provides a library interface to Cromwell, namely: +# * Start the Cromwell server +# * Submit execution requests to Cromwell +# * Poll Cromwell for job status + +import logging +import os +import subprocess +import time + +import requests + +import sys_util + + +class CromwellDriver(object): + + def __init__(self, cromwell_conf, cromwell_jar): + self.cromwell_conf = cromwell_conf + self.cromwell_jar = cromwell_jar + + self.cromwell_proc = None + + def start(self): + """Start the Cromwell service.""" + if self.cromwell_proc: + logging.info("Request to start Cromwell: already running") + return + + self.cromwell_proc = subprocess.Popen([ + 'java', + '-Dconfig.file=' + self.cromwell_conf, + '-Xmx4g', + '-jar', self.cromwell_jar, + 'server']) + + logging.info("Started Cromwell") + + def fetch(self, wf_id=None, post=False, files=None, method=None): + url = 'http://localhost:8000/api/workflows/v1' + if wf_id is not None: + url = os.path.join(url, wf_id) + if method is not None: + url = os.path.join(url, method) + if post: + r = requests.post(url, files=files) + else: + r = requests.get(url) + return r.json() + + def submit(self, wdl, workflow_inputs, workflow_options, sleep_time=15): + """Post new job to the server and poll for completion.""" + + # Add required input files + with open(wdl, 'rb') as f: + wdl_source = f.read() + with open(workflow_inputs, 'rb') as f: + wf_inputs = f.read() + + files = { + 'wdlSource': wdl_source, + 'workflowInputs': wf_inputs, + } + + # Add workflow options if specified + if workflow_options: + with open(workflow_options, 'rb') as f: + wf_options = f.read() + files['workflowOptions'] = wf_options + + # After Cromwell start, it may take a few seconds to be ready for requests. + # Poll up to a minute for successful connect and submit. + + job = None + max_time_wait = 60 + wait_interval = 5 + + time.sleep(wait_interval) + for attempt in range(max_time_wait/wait_interval): + try: + job = self.fetch(post=True, files=files) + break + except requests.exceptions.ConnectionError as e: + logging.info("Failed to connect to Cromwell (attempt %d): %s", + attempt + 1, e) + time.sleep(wait_interval) + + if not job: + sys_util.exit_with_error( + "Failed to connect to Cromwell after {0} seconds".format( + max_time_wait)) + + if job['status'] != 'Submitted': + sys_util.exit_with_error( + "Job status from Cromwell was not 'Submitted', instead '{0}'".format( + job['status'])) + + # Job is running. + cromwell_id = job['id'] + logging.info("Job submitted to Cromwell. job id: %s", cromwell_id) + + # Poll Cromwell for job completion. + attempt = 0 + max_failed_attempts = 3 + while True: + time.sleep(sleep_time) + + # Cromwell occassionally fails to respond to the status request. + # Only give up after 3 consecutive failed requests. + try: + status_json = self.fetch(wf_id=cromwell_id, method='status') + attempt = 0 + except requests.exceptions.ConnectionError as e: + attempt += 1 + logging.info("Error polling Cromwell job status (attempt %d): %s", + attempt, e) + + if attempt >= max_failed_attempts: + sys_util.exit_with_error( + "Cromwell did not respond for %d consecutive requests" % attempt) + + continue + + status = status_json['status'] + if status == 'Succeeded': + break + elif status == 'Submitted': + pass + elif status == 'Running': + pass + else: + sys_util.exit_with_error( + "Status of job is not Submitted, Running, or Succeeded: %s" % status) + + logging.info("Cromwell job status: %s", status) + + # Cromwell produces a list of outputs and full job details + outputs = self.fetch(wf_id=cromwell_id, method='outputs') + metadata = self.fetch(wf_id=cromwell_id, method='metadata') + + return outputs, metadata + + +if __name__ == '__main__': + pass diff --git a/runners/cromwell_on_google/cromwell_launcher/file_util.py b/runners/cromwell_on_google/cromwell_launcher/file_util.py new file mode 100644 index 0000000..188a81b --- /dev/null +++ b/runners/cromwell_on_google/cromwell_launcher/file_util.py @@ -0,0 +1,108 @@ +#!/usr/bin/python + +# Copyright 2017 Google Inc. +# +# Use of this source code is governed by a BSD-style +# license that can be found in the LICENSE file or at +# https://developers.google.com/open-source/licenses/bsd + +# file_util.py + +import logging +import simplejson +import string +import subprocess + +from googleapiclient import discovery +from googleapiclient.errors import HttpError +from oauth2client.client import GoogleCredentials + +import sys_util + + +def file_safe_substitute(file_name, mapping): + """Performs placeholder replacement on a file, saving contents in place.""" + + with open(file_name, 'rb') as f: + file_contents = f.read() + return string.Template(file_contents).safe_substitute(mapping) + + +def gsutil_cp(source_files, dest_dir): + """Copies files to GCS and exits on error.""" + + cp_cmd = ['gsutil', 'cp'] + source_files + [dest_dir] + + logging.info("Copying %s to %s", source_files, dest_dir) + + # Copies can fail, so include retries... + for attempt in range(3): + p = subprocess.Popen(cp_cmd, stderr=subprocess.PIPE) + return_code = p.wait() + if not return_code: + return + + logging.warn("Copy %s to %s failed: attempt %d", + source_files, dest_dir, attempt) + + sys_util.exit_with_error( + "copying files from %s to %s failed: %s" % ( + source_files, dest_dir, p.stderr.read())) + + +def verify_gcs_dir_empty_or_missing(path): + """Verify that the output "directory" does not exist or is empty.""" + + # Use the storage API directly instead of gsutil. + # gsutil does not return explicit error codes and so to detect + # a non-existent path would require capturing and parsing the error message. + + # Verify the input is a GCS path + if not path.startswith('gs://'): + sys_util.exit_with_error("Path is not a GCS path: '%s'" % path) + + # Tokenize the path into bucket and prefix + parts = path[len('gs://'):].split('/', 1) + bucket = parts[0] + prefix = parts[1] if len(parts) > 1 else None + + # Get the storage endpoint + credentials = GoogleCredentials.get_application_default() + service = discovery.build('storage', 'v1', credentials=credentials, + cache_discovery=False) + + # Build the request - only need the name + fields = 'nextPageToken,items(name)' + request = service.objects().list( + bucket=bucket, prefix=prefix, fields=fields, maxResults=2) + + # If we get more than 1 item, we are done (directory not empty) + # If we get zero items, we are done (directory empty) + # If we get 1 item, then we need to check if it is a "directory object" + + items = [] + while request and len(items) < 2: + try: + response = request.execute() + except HttpError as err: + error = simplejson.loads(err.content) + error = error['error'] + + sys_util.exit_with_error( + "%s %s: '%s'" % (error['code'], error['message'], path)) + + items.extend(response.get('items', [])) + request = service.objects().list_next(request, response) + + if not items: + return True + + if len(items) == 1 and items[0]['name'].rstrip('/') == prefix.rstrip('/'): + return True + + return False + + +if __name__ == '__main__': + pass + diff --git a/runners/cromwell_on_google/cromwell_launcher/jes_template.conf b/runners/cromwell_on_google/cromwell_launcher/jes_template.conf new file mode 100644 index 0000000..8caf70d --- /dev/null +++ b/runners/cromwell_on_google/cromwell_launcher/jes_template.conf @@ -0,0 +1,82 @@ +# Minimal Cromwell template for using JES + +webservice { + port = 8000 + interface = 0.0.0.0 + instance.name = "cromwell-for-wdl-runner" +} + +akka { + loggers = ["akka.event.slf4j.Slf4jLogger"] +} + +spray.can { + server { + request-timeout = 40s + } + client { + request-timeout = 40s + connecting-timeout = 40s + } +} + +backend { + default = "JES" + providers { + JES { + actor-factory = "cromwell.backend.impl.jes.JesBackendLifecycleActorFactory" + config { + project = "${project_id}" + root = "${working_dir}" + + genomics { + # A reference to an auth defined in the 'google' stanza at the top. This auth is used to create + # Pipelines and manipulate auth JSONs. + auth = "application-default" + endpoint-url = "https://genomics.googleapis.com/" + } + + filesystems = { + gcs { + # A reference to a potentially different auth for manipulating files via engine functions. + auth = "application-default" + } + } + + } + } + } +} + +engine { + filesystems { + gcs { + auth = "application-default" + } + } +} + +google { + application-name = "cromwell" + auths = [ + { + name = "application-default" + scheme = "application_default" + } + ] +} + +database { + profile = "slick.jdbc.HsqldbProfile$" + + db { + driver = "org.hsqldb.jdbcDriver" + url = "jdbc:hsqldb:mem:${slick.uniqueSchema};shutdown=false;hsqldb.tx=mvcc" + connectionTimeout = 3000 + } +} + +instrumentation { + use-kamon = false +} + diff --git a/runners/cromwell_on_google/cromwell_launcher/sys_util.py b/runners/cromwell_on_google/cromwell_launcher/sys_util.py new file mode 100644 index 0000000..2b31155 --- /dev/null +++ b/runners/cromwell_on_google/cromwell_launcher/sys_util.py @@ -0,0 +1,38 @@ +#!/usr/bin/python + +# Copyright 2017 Google Inc. +# +# Use of this source code is governed by a BSD-style +# license that can be found in the LICENSE file or at +# https://developers.google.com/open-source/licenses/bsd + +# sys_util.py + +import logging +import os +import sys + + +def exit_with_error(err_string): + """Emit the specified error string and exit with an exit code of 1.""" + sys.stderr.write("ERROR: {0}\n".format(err_string)) + sys.exit(1) + + +def copy_from_env(env_vars, environ): + """Returns a dict of required environment variables.""" + + result = {} + for e in env_vars: + val = environ.get(e, None) + if val is None: + exit_with_error("the " + e + " environment variable must be set") + logging.info(e + "->" + os.environ[e]) + result[e] = val + + return result + + +if __name__ == '__main__': + pass + diff --git a/runners/cromwell_on_google/cromwell_launcher/wdl_outputs_util.py b/runners/cromwell_on_google/cromwell_launcher/wdl_outputs_util.py new file mode 100644 index 0000000..57ae8f4 --- /dev/null +++ b/runners/cromwell_on_google/cromwell_launcher/wdl_outputs_util.py @@ -0,0 +1,69 @@ +#!/usr/bin/python + +# Copyright 2017 Google Inc. +# +# Use of this source code is governed by a BSD-style +# license that can be found in the LICENSE file or at +# https://developers.google.com/open-source/licenses/bsd + +# wdl_outputs_util.py +# +# When Cromwell finishes running a workflow on Compute Engine, the +# output files are not in their final location, they are down under +# the workflow's "workspace" path. +# +# The routines in this file can be used to get the list of output files +# to copy. + + +def get_matching_element(value, match_string): + """Returns a list of values which match the given prefix string. + + The input value can be a singleton string, a list, or a dict. + If the input value is a list or dict, this function will be called + recursively (via get_matching_list_values or get_matching_dict_values + respectively). + """ + + match_list = list() + + if isinstance(value, list): + match_list += get_matching_list_values(value, match_string) + + elif isinstance(value, dict): + match_list += get_matching_dict_values(value, match_string) + + elif isinstance(value, unicode) or isinstance(value, str): + if value.startswith(match_string) != -1: + match_list.append(value) + + else: + # We don't search floats or bools. + pass + + return match_list + + +def get_matching_list_values(l, match_string): + """Returns a list of values from a list which match the given string.""" + + match_list = list() + for value in l: + match_list += get_matching_element(value, match_string) + + return match_list + + +def get_matching_dict_values(d, match_string): + """Returns a list of values from a dict which match the given string.""" + + match_list = list() + for value in d.itervalues(): + match_list += get_matching_element(value, match_string) + + return match_list + + +def get_workflow_output(outputs, working_dir): + return get_matching_dict_values(outputs, working_dir) + diff --git a/runners/cromwell_on_google/cromwell_launcher/wdl_runner.py b/runners/cromwell_on_google/cromwell_launcher/wdl_runner.py new file mode 100644 index 0000000..d68b1e5 --- /dev/null +++ b/runners/cromwell_on_google/cromwell_launcher/wdl_runner.py @@ -0,0 +1,174 @@ +#!/usr/bin/python + +# Copyright 2017 Google Inc. +# +# Use of this source code is governed by a BSD-style +# license that can be found in the LICENSE file or at +# https://developers.google.com/open-source/licenses/bsd + +# wdl_runner.py +# +# This script is a wrapper around Cromwell, which will: +# +# 1- take as input: +# * a WDL file to describe a workflow +# * a JSON file to describe the inputs +# * a Google Cloud project ID (ignored if on GCE) to run in +# * a GCS path to a Cromwell "working directory" +# * a GCS path to an "output directory" +# +# 2- launch a local instance of Cromwell +# +# 3- submit the inputs to Cromwell to run the workflow +# +# 4- copy the Cromwell run metadata ('wdl_run_metadata.json') to the +# output directory +# +# 5- copy the Cromwell outputs to the output directory +# +# Cromwell can be found at: +# https://github.com/broadinstitute/cromwell + +import argparse +import json +import logging +import os +import urllib2 + +import cromwell_driver +import file_util +import sys_util +import wdl_outputs_util + +WDL_RUN_METADATA_FILE = 'wdl_run_metadata.json' + + +def gce_get_metadata(path): + """Queries the GCE metadata server the specified value.""" + req = urllib2.Request( + 'http://metadata/computeMetadata/v1/%s' % path, + None, {'Metadata-Flavor': 'Google'}) + + return urllib2.urlopen(req).read() + + +class Runner(object): + + def __init__(self, args, environ): + self.args = args + + # Fetch all required environment variables, exiting if unset. + self.environ = sys_util.copy_from_env( + ['CROMWELL', 'CROMWELL_CONF'], environ) + cromwell_conf = self.environ['CROMWELL_CONF'] + cromwell_jar = self.environ['CROMWELL'] + + # Verify that the output directory is empty (or not there). + if not file_util.verify_gcs_dir_empty_or_missing(self.args.output_dir): + sys_util.exit_with_error( + "Output directory not empty: %s" % self.args.output_dir) + + # Plug in the working directory and the project id to the Cromwell conf + self.fill_cromwell_conf(cromwell_conf, + self.args.working_dir, self.args.project) + + # Set up the Cromwell driver + self.driver = cromwell_driver.CromwellDriver(cromwell_conf, cromwell_jar) + self.driver.start() + + def fill_cromwell_conf(self, cromwell_conf, working_dir, project): + try: + project_id = gce_get_metadata('project/project-id') + + if project and project != project_id: + logging.warning("Overridding project ID %s with %s", + project, project_id) + + except urllib2.URLError as e: + logging.warning( + "URLError trying to fetch project ID from Compute Engine metdata") + logging.warning(e) + logging.warning("Assuming not running on Compute Engine") + + project_id = project + + new_conf_data = file_util.file_safe_substitute(cromwell_conf, { + 'project_id': project_id, + 'working_dir': working_dir + }) + + with open(cromwell_conf, 'wb') as f: + f.write(new_conf_data) + + def copy_workflow_output(self, result): + output_files = wdl_outputs_util.get_workflow_output( + result['outputs'], self.args.working_dir) + + # Copy final output files (if any) + logging.info("Workflow output files = %s", output_files) + + if output_files: + file_util.gsutil_cp(output_files, "%s/" % self.args.output_dir) + + def copy_workflow_metadata(self, metadata, metadata_filename): + + logging.info("Copying run metadata to %s", self.args.output_dir) + + # Copy the run metadata + with open(metadata_filename, 'w') as f: + json.dump(metadata, f) + + file_util.gsutil_cp([metadata_filename], "%s/" % self.args.output_dir) + + def run(self): + logging.info("starting") + + # Submit the job to the local Cromwell server + (result, metadata) = self.driver.submit(self.args.wdl, + self.args.workflow_inputs, + self.args.workflow_options) + logging.info(result) + + # Copy run metadata and output files to the output directory + self.copy_workflow_metadata(metadata, WDL_RUN_METADATA_FILE) + self.copy_workflow_output(result) + + logging.info("run complete") + + +def main(): + parser = argparse.ArgumentParser(description='Run WDLs') + parser.add_argument('--wdl', required=True, + help='The WDL file to run') + parser.add_argument('--workflow-inputs', required=True, + help='The workflow inputs (JSON) file') + parser.add_argument('--workflow-options', required=False, + help='The workflow options (JSON) file') + parser.add_argument('--project', required=False, + help='The Cloud project id') + parser.add_argument('--working-dir', required=True, + help='Location for Cromwell to put intermediate results.') + parser.add_argument('--output-dir', required=True, + help='Location to store the final results.') + + args = parser.parse_args() + + # Sanitize the working and output paths + args.working_dir.rstrip('/') + args.output_dir.rstrip('/') + + # Write logs at info level + FORMAT = '%(asctime)-15s %(module)s %(levelname)s: %(message)s' + logging.basicConfig(level=logging.INFO, format=FORMAT) + + # Don't info-log every new connection to localhost, to keep stderr small. + logging.getLogger("urllib3.connectionpool").setLevel(logging.WARNING) + logging.getLogger("requests.packages.urllib3.connectionpool").setLevel( + logging.WARNING) + + runner = Runner(args, os.environ) + runner.run() + + +if __name__ == '__main__': + main() diff --git a/runners/cromwell_on_google/cromwell_launcher/wdl_runner.sh b/runners/cromwell_on_google/cromwell_launcher/wdl_runner.sh new file mode 100644 index 0000000..99481cf --- /dev/null +++ b/runners/cromwell_on_google/cromwell_launcher/wdl_runner.sh @@ -0,0 +1,33 @@ +#!/bin/bash + +# Copyright 2017 Google Inc. +# +# Use of this source code is governed by a BSD-style +# license that can be found in the LICENSE file or at +# https://developers.google.com/open-source/licenses/bsd + +set -o errexit +set -o nounset + +readonly INPUT_PATH=/pipeline/input + +# WDL, INPUTS, and OPTIONS file contents are all passed into +# the pipeline as environment variables - write them out as +# files. +mkdir -p "${INPUT_PATH}" +echo "${WDL}" > "${INPUT_PATH}/wf.wdl" +echo "${WORKFLOW_INPUTS}" > "${INPUT_PATH}/wf.inputs.json" +echo "${WORKFLOW_OPTIONS}" > "${INPUT_PATH}/wf.options.json" + +# Set the working directory to the location of the scripts +readonly SCRIPT_DIR=$(dirname $0) +cd "${SCRIPT_DIR}" + +# Execute the wdl_runner +python -u wdl_runner.py \ + --wdl "${INPUT_PATH}"/wf.wdl \ + --workflow-inputs "${INPUT_PATH}"/wf.inputs.json \ + --working-dir "${WORKSPACE}" \ + --workflow-options "${INPUT_PATH}"/wf.options.json \ + --output-dir "${OUTPUTS}" + diff --git a/runners/cromwell_on_google/tools/monitor_wdl_pipeline.sh b/runners/cromwell_on_google/tools/monitor_wdl_pipeline.sh new file mode 100755 index 0000000..457f12b --- /dev/null +++ b/runners/cromwell_on_google/tools/monitor_wdl_pipeline.sh @@ -0,0 +1,232 @@ +#!/bin/bash + +# Copyright 2017 Google Inc. +# +# Use of this source code is governed by a BSD-style +# license that can be found in the LICENSE file or at +# https://developers.google.com/open-source/licenses/bsd + +# monitor_wdl_pipeline.sh +# +# Simple script that can be used to monitor the status of a WDL pipeline +# launched using workflows/wdl_pipeline.sh or workflows/wdl_pipeline_from_git.sh +# and run by the Broad's Cromwell (https://github.com/broadinstitute/cromwell). +# +# The script accepts an operation ID for a pipeline, extracts the +# LOGGING, WORKSPACE, and OUTPUT directories from the operation and then +# examines these directories to glean some insights into the status of the +# operation. +# +# Note: if the WORKSPACE and/or OUTPUT directories for the specified operation +# are already populated (for example by another operation), this script +# will emit incorrect output. + +set -o errexit +set -o nounset + +readonly SCRIPT_DIR=$(dirname "${0}") +readonly REPO_ROOT=$(cd ${SCRIPT_DIR}/../../ && pwd) + +# Bring in polling utility functions +source ${REPO_ROOT}/tools/operations_util.sh + +# FUNCTIONS + +# gsutil_ls +# +# Run "gsutil ls" masking stderr output and the non-zero exit code +function gsutil_ls() { + gsutil ls $* 2>/dev/null || true +} +readonly -f gsutil_ls + +# line_count +# +# Emit the number of lines of text in a string. +function line_count() { + local lines="${1}" + if [[ -z "${lines}" ]]; then + echo "0" + else + echo "${lines}" | wc -l + fi +} +readonly line_count + +# indent +# +# Indent stdin two spaces +function indent() { + sed -e 's#^# #' +} +readonly -f indent + +# MAIN + +if [[ $# -lt 1 ]]; then + 2>&1 echo "Usage: $0 OPERATION-ID " + exit 1 +fi + +readonly OPERATION_ID="${1}" +readonly POLL_INTERVAL_SECONDS="${2:-60}" # Default: 60 seconds between requests +readonly POLL_WAIT_MAX="${3:-}" # Default: wait forever + +# Get GCS paths from the operation +LOGGING=$(get_operation_value "${OPERATION_ID}" \ + "metadata.request.pipelineArgs.logging.gcsPath") +WORKSPACE=$(get_operation_value "${OPERATION_ID}" \ + "metadata.request.pipelineArgs.inputs.WORKSPACE") +OUTPUTS=$(get_operation_value "${OPERATION_ID}" \ + "metadata.request.pipelineArgs.inputs.OUTPUTS") + +echo "Logging: ${LOGGING}" +echo "Workspace: ${WORKSPACE}" +echo "Outputs: ${OUTPUTS}" + +# Loop until operation complete or POLL_WAIT_MAX +POLL_WAIT_TOTAL=0 +LOGS_COUNT=-1 +PREEMPT_COUNT=-1 +OUTPUT_COUNT=-1 +while [[ $(get_operation_done_status "${OPERATION_ID}") == "false" ]]; do + + echo + echo "$(date '+%Y-%m-%d %H:%M:%S'): operation not complete" + + # Check that we haven't been polling too long + if [[ -n "${POLL_WAIT_MAX}" ]] && \ + [[ "${POLL_WAIT_TOTAL}" -ge "${POLL_WAIT_MAX}" ]]; then + echo "Total wait time (${POLL_WAIT_TOTAL} seconds) has exceeded the max (${POLL_WAIT_MAX})." + exit 2 + fi + + # Gather info. These directories can be empty for a while during execution + + # Logs should be 0 to 3 files and should be the first to show up + GS_LOGS=$(gsutil_ls "${LOGGING}/${OPERATION_ID#operations/}*") + GS_WS=$(gsutil_ls "${WORKSPACE}/**" | sed -e 's#^'${WORKSPACE}/'##') + GS_OUT=$(gsutil_ls "${OUTPUTS}") + + # The Cromwell filesystem is going to be: + # ///call- + # with some variety to what is under each "call-" + # + # If the call is unsharded (not a scatter), then the call- + # directory contains objects like: + # exec.sh -- The code that gets executed on the VM. + # -rc.txt -- Contains the return code for the stage. + # This file will not be exist until the stage completes. + # If the VM is preempted, this file will never be written. + # -stdout.txt -- stdout captured during execution of exec.sh. + # -stderr.txt -- stderr captured during execution of exec.sh. + # .txt -- Genomics Pipeliens operations log. + # + # If the stage is sharded, then for each shard there will be a folder under + # call- named "shard-" where numbering starts at 0. + # + # If the Pipelines VM was preempted, then a subdirectory will be found under + # call- named "attempt-" where numbering starts at 2. + # + # For sharded and preemptible stages then, one may find: + # call-/shard-/attempt- + + # From the list of the files in the workspace directory, extract useful + # sets of files from which we can glean status: + # All "exec.sh" files: indicates that the call and/or shard and/or attempt + # has started. + # All "-rc.txt" files: indicates that the call and/or shard and/or attempt + # has completed. + # All "attempt-*/exec.sh" files: indicates that a previous attempt failed + # and is assume to be due to preemption (*this is *not* checked explicitly). + + WS_EXECS=$(echo "${GS_WS}" | grep '/exec.sh$' || true) + WS_RCS=$(echo "${GS_WS}" | grep '\-rc.txt$' || true) + WS_PREEMPTS=$(echo "${GS_WS}" | grep 'attempt-[0-9]\+/exec.sh$' || true) + + # Emit status + + GS_LOGS_COUNT=$(line_count "${GS_LOGS}") + if [[ "${GS_LOGS_COUNT}" -ne "${LOGS_COUNT}" ]]; then + if [[ -n "${GS_LOGS}" ]]; then + echo "Operation logs found: " + echo "${GS_LOGS}" | sed -e 's#^'${LOGGING}/'##g' | indent + else + echo "No operations logs found." + fi + LOGS_COUNT="${GS_LOGS_COUNT}" + fi + + if [[ -n "${WS_EXECS}" ]]; then + if [[ -n "${WS_RCS}" ]]; then + echo "Calls (including shards) completed: "$(line_count "${WS_RCS}") + fi + + # To determine what is running, find all call or call/shard paths that + # have an "exec.sh" with no "*-rc.txt" file. + WS_CALLS_STARTED=$( + echo "${WS_EXECS}" \ + | sed -e 's#[^/]\+/[^/]\+/##' \ + -e 's#/exec.sh##' \ + -e 's#/attempt-[0-9]\+##' \ + | sort -u) + WS_CALLS_COMPLETE=$( + echo "${WS_RCS}" \ + | sed -e 's#[^/]\+/[^/]\+/##' \ + -e 's#/[^/]\+-rc.txt##' \ + -e 's#/attempt-[0-9]\+##' \ + | sort -u) + + IN_PROGRESS=$(\ + comm -2 -3 \ + <(echo "${WS_CALLS_STARTED}") \ + <(echo "${WS_CALLS_COMPLETE}")) + if [[ -n ${IN_PROGRESS} ]]; then + echo "Calls started but not complete:" + echo "${IN_PROGRESS}" | indent + else + echo "No calls currently in progress." + echo "(Transitioning to next stage or copying final output)." | indent + fi + + WS_PREEMPT_COUNT=$(line_count "${WS_PREEMPTS}") + if [[ "${WS_PREEMPT_COUNT}" -ne "${PREEMPT_COUNT}" ]]; then + echo "Total Preemptions: ${WS_PREEMPT_COUNT}" + PREEMPT_COUNT="${WS_PREEMPT_COUNT}" + fi + fi + + GS_OUTPUT_COUNT=$(line_count "${GS_OUT}") + if [[ "${GS_OUTPUT_COUNT}" -ne "${OUTPUT_COUNT}" ]]; then + echo "There are ${GS_OUTPUT_COUNT} output files" + + OUTPUT_COUNT="${GS_OUTPUT_COUNT}" + fi + + echo "Sleeping ${POLL_INTERVAL_SECONDS} seconds" + sleep ${POLL_INTERVAL_SECONDS} + POLL_WAIT_TOTAL=$((POLL_WAIT_TOTAL + POLL_INTERVAL_SECONDS)) +done + +echo +echo "$(date '+%Y-%m-%d %H:%M:%S'): operation complete" + +echo "Completed operation status information" +get_operation_status "${OPERATION_ID}" | indent + +echo "Operation output" +gsutil_ls "${OUTPUTS}" | indent + +echo +echo "Preemption retries:" +WS_PREEMPTS=$(gsutil_ls ${WORKSPACE}/**/attempt-*/exec.sh) +if [[ -n "${WS_PREEMPTS}" ]]; then + echo "${WS_PREEMPTS}" \ + | sed -e 's#^'${WORKSPACE}'/[^/]\+/[^/]\+/##' \ + -e 's#/exec.sh##' \ + | indent + + echo "Total preemptions: " $(echo "${WS_PREEMPTS}" | wc -l) +else + echo "None" | indent +fi diff --git a/runners/cromwell_on_google/wdl_pipeline.yaml b/runners/cromwell_on_google/wdl_pipeline.yaml new file mode 100644 index 0000000..08335d2 --- /dev/null +++ b/runners/cromwell_on_google/wdl_pipeline.yaml @@ -0,0 +1,24 @@ +name: WDL Runner +description: Run a workflow defined by a WDL file + +inputParameters: +- name: WDL + description: Workflow definition +- name: WORKFLOW_INPUTS + description: Workflow inputs +- name: WORKFLOW_OPTIONS + description: Workflow options + +- name: WORKSPACE + description: Cloud Storage path for intermediate files +- name: OUTPUTS + description: Cloud Storage path for output files + +docker: + imageName: gcr.io/broad-dsde-outreach/wdl_runner:2017_06_13 + + cmd: > + /wdl_runner/wdl_runner.sh + +resources: + minimumRamGb: 3.75