diff --git a/MIGRATION.md b/MIGRATION.md
index 62b4b25cb..4669113fe 100644
--- a/MIGRATION.md
+++ b/MIGRATION.md
@@ -83,4 +83,43 @@ The following workflow options have changed names:
## Database Migrations
-... to be determined ...
+Cromwell's database structure has changed significantly between 0.19 and 0.21.
+All pre-existing data has to be transformed/moved to new tables in order to be usable.
+The migration process can be split into 2 steps:
+
+### Restart Migration
+
+Cromwell 0.21 will migrate workflows that were in **Running** state to the new database structure in order to attempt to resume them once the server restarts.
+This will ensure that even if cromwell is stopped while workflows are still running they aren't lost.
+No particular action/configuration is required for this step.
+
+### Metadata Migration (MySQL Only)
+
+In order to keep metadata from previous (and current) workflow runs, all the data has to be moved to a new centralized table.
+Depending on the number and shape of the workflows in your database **this step can be significantly time and space consuming**.
+It is not possible to give an accurate estimation due to the multiple variables in play like number of workflows, complexity (number of tasks, scatters, attempts per task, etc...), hardware performance (of the database, of the machine running cromwell), backends used, etc...
+
+However, a good rule of thumb is to make sure that **your database has enough disk space to grow a factor 10**.
+This is due to the fact that data is de-normalized during the migration. In particular all inputs and outputs, which means the more complex / large your outputs are (large arrays, etc...), the more your database will grow.
+Also be aware that **the migration can take several hours for substantially large databases.**
+
+#### Important Notes
+
+* For better performance, make sure the flag `rewriteBatchedStatements` is set to `true`. This can be done by adding to your database connection url.
+
+e.g:
+
+ jdbc:mysql:http://localhost:3006/cromwell_db?rewriteBatchedStatements=true
+
+See the [mysql doc](https://dev.mysql.com/doc/connector-j/5.1/en/connector-j-reference-configuration-properties.html) for more information.
+
+* Because of the nature of wdl inputs and outputs, as well as the way they were stored up until cromwell 0.19, it is necessary to pull them out of the database, process them one by one in cromwell, and re-insert them.
+For that reason, if your database contains workflows with very large inputs or outputs (for example `Array`s of several thousands of elements, large matrix, etc...),
+you might want to tune the `migration-read-batch-size` and `migration-write-batch-size` configuration fields (see the `database` section in `application.conf`)
+
+ * `migration-read-batch-size` sets the number of rows that should be retrieved at the same time from the database. Once every row is processed, the next set is retrieved until there are no more.
+If your workflows/tasks have very large inputs or outputs, a number too large here could cause out of memory errors, or extended waiting times to pull the data from the database.
+On the other hand, a number too small could decrease performance by causing more round-trips to the database with the associated overhead.
+
+ * `migration-write-batch-size` sets the number of insert statements that are buffered before being committed in a transaction. This decreases the number of queries to the database, while making sure the batch never gets too big.
+You might consider decreasing this field if your workflows/tasks have very large `String`s as input or output (for example several MB of text).
\ No newline at end of file
diff --git a/build.sbt b/build.sbt
index 176eb13d9..68ef901ee 100644
--- a/build.sbt
+++ b/build.sbt
@@ -9,13 +9,19 @@ lazy val gcsFileSystem = (project in file("filesystems/gcs"))
.settings(gcsFileSystemSettings:_*)
.withTestSettings
+lazy val databaseCore = (project in file("database/core"))
+ .settings(databaseCoreSettings:_*)
+ .withTestSettings
+
lazy val databaseSql = (project in file("database/sql"))
.settings(databaseSqlSettings:_*)
+ .dependsOn(databaseCore)
.withTestSettings
lazy val databaseMigration = (project in file("database/migration"))
.settings(databaseMigrationSettings: _*)
.dependsOn(core)
+ .dependsOn(databaseCore)
.withTestSettings
lazy val services = (project in file("services"))
@@ -82,6 +88,7 @@ lazy val root = (project in file("."))
// Full list of all sub-projects to build with the root (ex: include in `sbt test`)
.aggregate(core)
.aggregate(gcsFileSystem)
+ .aggregate(databaseCore)
.aggregate(databaseSql)
.aggregate(databaseMigration)
.aggregate(services)
diff --git a/core/src/main/resources/application.conf b/core/src/main/resources/application.conf
index 43bb3ed6d..012720773 100644
--- a/core/src/main/resources/application.conf
+++ b/core/src/main/resources/application.conf
@@ -377,6 +377,11 @@ database {
password = "pass"
connectionTimeout = 5000
}
+
+ migration {
+ read-batch-size = 100000
+ write-batch-size = 100000
+ }
}
}
}
diff --git a/core/src/main/scala/cromwell/core/simpleton/WdlValueSimpleton.scala b/core/src/main/scala/cromwell/core/simpleton/WdlValueSimpleton.scala
index 905c0f03e..c417ad3ee 100644
--- a/core/src/main/scala/cromwell/core/simpleton/WdlValueSimpleton.scala
+++ b/core/src/main/scala/cromwell/core/simpleton/WdlValueSimpleton.scala
@@ -1,6 +1,6 @@
package cromwell.core.simpleton
-import wdl4s.values.{WdlArray, WdlMap, WdlObjectLike, WdlPrimitive, WdlValue}
+import wdl4s.values._
case class WdlValueSimpleton(simpletonKey: String, simpletonValue: WdlPrimitive)
@@ -29,6 +29,7 @@ object WdlValueSimpleton {
case WdlArray(_, arrayValue) => arrayValue.zipWithIndex flatMap { case (arrayItem, index) => arrayItem.simplify(s"$name[$index]") }
case WdlMap(_, mapValue) => mapValue flatMap { case (key, value) => value.simplify(s"$name:${key.valueString.escapeMeta}") }
case wdlObject: WdlObjectLike => wdlObject.value flatMap { case (key, value) => value.simplify(s"$name:${key.escapeMeta}") }
+ case other => throw new Exception(s"Cannot simplify wdl value $other of type ${other.wdlType}")
}
}
diff --git a/database/core/src/main/scala/cromwell/database/core/SqlConfiguration.scala b/database/core/src/main/scala/cromwell/database/core/SqlConfiguration.scala
new file mode 100644
index 000000000..9c5fcc564
--- /dev/null
+++ b/database/core/src/main/scala/cromwell/database/core/SqlConfiguration.scala
@@ -0,0 +1,13 @@
+package cromwell.database.core
+
+import com.typesafe.config.ConfigFactory
+import lenthall.config.ScalaConfig._
+
+object SqlConfiguration {
+ lazy val rootConfig = ConfigFactory.load()
+ private lazy val rootDatabaseConfig = rootConfig.getConfig("database")
+ private lazy val databaseConfigName = rootDatabaseConfig.getStringOption("config")
+ lazy val defaultDatabaseConfig = databaseConfigName.map(getDatabaseConfig).getOrElse(rootDatabaseConfig)
+
+ def getDatabaseConfig(path: String) = rootDatabaseConfig.getConfig(path)
+}
diff --git a/database/migration/src/main/resources/changelog.xml b/database/migration/src/main/resources/changelog.xml
index c127ebc37..c8562d5ab 100644
--- a/database/migration/src/main/resources/changelog.xml
+++ b/database/migration/src/main/resources/changelog.xml
@@ -44,6 +44,8 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/database/migration/src/main/resources/changesets/migration/metadata/CreateAndLoadTmpExecutionMigration.sql b/database/migration/src/main/resources/changesets/migration/metadata/CreateAndLoadTmpExecutionMigration.sql
new file mode 100644
index 000000000..060f9c8c0
--- /dev/null
+++ b/database/migration/src/main/resources/changesets/migration/metadata/CreateAndLoadTmpExecutionMigration.sql
@@ -0,0 +1,41 @@
+CREATE TABLE TMP_EXECUTION_MIGRATION LIKE EXECUTION;
+
+INSERT INTO TMP_EXECUTION_MIGRATION (
+ EXECUTION_ID,
+ WORKFLOW_EXECUTION_ID,
+ RESULTS_CLONED_FROM,
+ CALL_FQN,
+ STATUS,
+ IDX,
+ RC,
+ START_DT,
+ END_DT,
+ ALLOWS_RESULT_REUSE,
+ DOCKER_IMAGE_HASH,
+ EXECUTION_HASH,
+ ATTEMPT,
+ BACKEND_TYPE
+)
+ SELECT
+ EXECUTION_ID,
+ WORKFLOW_EXECUTION_ID,
+ RESULTS_CLONED_FROM,
+ CALL_FQN,
+ STATUS,
+ IDX,
+ RC,
+ START_DT,
+ END_DT,
+ ALLOWS_RESULT_REUSE,
+ DOCKER_IMAGE_HASH,
+ EXECUTION_HASH,
+ ATTEMPT,
+ BACKEND_TYPE
+ FROM EXECUTION e
+ WHERE e.CALL_FQN NOT LIKE '%$%' AND -- filter out scatters
+ NOT (e.IDX = -1 AND EXISTS ( -- filter out collectors
+ SELECT 1 FROM EXECUTION e2 WHERE
+ e2.WORKFLOW_EXECUTION_ID = e.WORKFLOW_EXECUTION_ID AND
+ e2.CALL_FQN = e.CALL_FQN AND
+ e2.IDX != -1)
+ );
diff --git a/database/migration/src/main/resources/changesets/migration/metadata/CreateTmpSymbolTable.sql b/database/migration/src/main/resources/changesets/migration/metadata/CreateTmpSymbolTable.sql
new file mode 100644
index 000000000..8488987e5
--- /dev/null
+++ b/database/migration/src/main/resources/changesets/migration/metadata/CreateTmpSymbolTable.sql
@@ -0,0 +1,11 @@
+CREATE TABLE TMP_SYMBOL
+(
+ TMP_SYMBOL_ID INT PRIMARY KEY NOT NULL AUTO_INCREMENT,
+ WORKFLOW_EXECUTION_UUID VARCHAR(100) NOT NULL,
+ SYMBOL_NAME VARCHAR(100),
+ SYMBOL_SCOPE VARCHAR(255),
+ SYMBOL_INDEX INT,
+ SYMBOL_ATTEMPT INT,
+ WDL_VALUE LONGTEXT,
+ WDL_TYPE VARCHAR(100)
+);
diff --git a/database/migration/src/main/resources/changesets/migration/metadata/ExecutionInfoTableMigration.sql b/database/migration/src/main/resources/changesets/migration/metadata/ExecutionInfoTableMigration.sql
new file mode 100644
index 000000000..c4be8a917
--- /dev/null
+++ b/database/migration/src/main/resources/changesets/migration/metadata/ExecutionInfoTableMigration.sql
@@ -0,0 +1,38 @@
+INSERT INTO METADATA_JOURNAL (
+ WORKFLOW_EXECUTION_UUID,
+ METADATA_KEY,
+ CALL_FQN,
+ JOB_SCATTER_INDEX,
+ JOB_RETRY_ATTEMPT,
+ METADATA_VALUE,
+ METADATA_VALUE_TYPE,
+ METADATA_TIMESTAMP
+)
+ SELECT
+ WORKFLOW_EXECUTION_UUID,
+ CASE ei.INFO_KEY
+ WHEN '$log_stdout' THEN 'stdout'
+ WHEN '$log_stderr' THEN 'stderr'
+ WHEN 'JES_RUN_ID' THEN 'jobId'
+ WHEN 'JES_STATUS' THEN 'backendStatus'
+ WHEN 'SGE_JOB_NUMBER' THEN 'jobNumber'
+ WHEN 'LSF_JOB_NUMBER' THEN 'jobNumber'
+ ELSE
+ IF(ei.INFO_KEY LIKE '$log_%', -- backend log
+ CONCAT(
+ 'backendLogs:', -- prepend metadata prefix
+ SUBSTRING(ei.INFO_KEY, 6, LENGTH(ei.INFO_KEY) - 5) -- remove $log_ prefix from the key
+ ),
+ ei.INFO_KEY -- Just put the key otherwise
+ ) END,
+ CALL_FQN,
+ IDX,
+ ATTEMPT,
+ ei.INFO_VALUE,
+ 'string',
+ NOW()
+ FROM EXECUTION_INFO ei
+ LEFT JOIN TMP_EXECUTION_MIGRATION e ON ei.EXECUTION_ID = e.EXECUTION_ID
+ JOIN WORKFLOW_EXECUTION we ON we.WORKFLOW_EXECUTION_ID = e.WORKFLOW_EXECUTION_ID
+ WHERE
+ ei.INFO_VALUE IS NOT NULL;
diff --git a/database/migration/src/main/resources/changesets/migration/metadata/LoadCallOutputSymbols.sql b/database/migration/src/main/resources/changesets/migration/metadata/LoadCallOutputSymbols.sql
new file mode 100644
index 000000000..421bf8a68
--- /dev/null
+++ b/database/migration/src/main/resources/changesets/migration/metadata/LoadCallOutputSymbols.sql
@@ -0,0 +1,34 @@
+INSERT INTO TMP_SYMBOL (
+ WORKFLOW_EXECUTION_UUID,
+ SYMBOL_NAME,
+ SYMBOL_SCOPE,
+ SYMBOL_INDEX,
+ SYMBOL_ATTEMPT,
+ WDL_VALUE,
+ WDL_TYPE
+)
+ SELECT
+ WORKFLOW_EXECUTION_UUID,
+ s.NAME,
+ e.CALL_FQN,
+ e.IDX,
+ e.ATTEMPT,
+ s.WDL_VALUE,
+ s.WDL_TYPE
+ FROM SYMBOL s
+ JOIN WORKFLOW_EXECUTION we ON
+ we.WORKFLOW_EXECUTION_ID = s.WORKFLOW_EXECUTION_ID
+ JOIN TMP_EXECUTION_MIGRATION e ON
+ e.CALL_FQN = s.SCOPE AND
+ e.WORKFLOW_EXECUTION_ID = s.WORKFLOW_EXECUTION_ID AND
+ s.`INDEX` = e.IDX
+ WHERE
+ s.IO = 'OUTPUT' AND
+ NOT EXISTS ( -- filter out earlier attempts
+ SELECT 1
+ FROM TMP_EXECUTION_MIGRATION e3
+ WHERE
+ e3.WORKFLOW_EXECUTION_ID = e.WORKFLOW_EXECUTION_ID AND
+ e3.CALL_FQN = e.CALL_FQN AND
+ e3.IDX = e.IDX AND
+ e3.ATTEMPT > e.ATTEMPT);
diff --git a/database/migration/src/main/resources/changesets/migration/metadata/LoadInputSymbols.sql b/database/migration/src/main/resources/changesets/migration/metadata/LoadInputSymbols.sql
new file mode 100644
index 000000000..a9a5f43e8
--- /dev/null
+++ b/database/migration/src/main/resources/changesets/migration/metadata/LoadInputSymbols.sql
@@ -0,0 +1,26 @@
+INSERT INTO TMP_SYMBOL (
+ WORKFLOW_EXECUTION_UUID,
+ SYMBOL_NAME,
+ SYMBOL_SCOPE,
+ SYMBOL_INDEX,
+ SYMBOL_ATTEMPT,
+ WDL_VALUE,
+ WDL_TYPE
+)
+ SELECT
+ WORKFLOW_EXECUTION_UUID,
+ s.NAME,
+ s.SCOPE,
+ e.IDX,
+ e.ATTEMPT,
+ s.WDL_VALUE,
+ s.WDL_TYPE
+ FROM SYMBOL s
+ JOIN WORKFLOW_EXECUTION we ON
+ we.WORKFLOW_EXECUTION_ID = s.WORKFLOW_EXECUTION_ID
+ LEFT JOIN TMP_EXECUTION_MIGRATION e ON
+ e.CALL_FQN = s.SCOPE AND
+ e.WORKFLOW_EXECUTION_ID = s.WORKFLOW_EXECUTION_ID
+ -- Don't join on index here because inputs have index null (-1), but we want to duplicate them as many times as there are indices for a given execution
+ WHERE
+ s.IO = 'INPUT';
diff --git a/database/migration/src/main/resources/changesets/migration/metadata/LoadWorkflowOutputSymbols.sql b/database/migration/src/main/resources/changesets/migration/metadata/LoadWorkflowOutputSymbols.sql
new file mode 100644
index 000000000..5124ff1a8
--- /dev/null
+++ b/database/migration/src/main/resources/changesets/migration/metadata/LoadWorkflowOutputSymbols.sql
@@ -0,0 +1,19 @@
+INSERT INTO TMP_SYMBOL (
+ WORKFLOW_EXECUTION_UUID,
+ SYMBOL_NAME,
+ SYMBOL_SCOPE,
+ WDL_VALUE,
+ WDL_TYPE
+)
+ SELECT
+ WORKFLOW_EXECUTION_UUID,
+ s.NAME,
+ s.SCOPE,
+ s.WDL_VALUE,
+ s.WDL_TYPE
+ FROM SYMBOL s
+ JOIN WORKFLOW_EXECUTION we ON
+ we.WORKFLOW_EXECUTION_ID = s.WORKFLOW_EXECUTION_ID
+ WHERE
+ s.IO = 'OUTPUT' AND
+ s.REPORTABLE_RESULT = 1;
diff --git a/database/migration/src/main/resources/changesets/migration/metadata/RuntimeAttributesTableMigration.sql b/database/migration/src/main/resources/changesets/migration/metadata/RuntimeAttributesTableMigration.sql
new file mode 100644
index 000000000..1dfb65038
--- /dev/null
+++ b/database/migration/src/main/resources/changesets/migration/metadata/RuntimeAttributesTableMigration.sql
@@ -0,0 +1,22 @@
+INSERT INTO METADATA_JOURNAL (
+ WORKFLOW_EXECUTION_UUID,
+ METADATA_KEY,
+ CALL_FQN,
+ JOB_SCATTER_INDEX,
+ JOB_RETRY_ATTEMPT,
+ METADATA_VALUE,
+ METADATA_VALUE_TYPE,
+ METADATA_TIMESTAMP
+)
+ SELECT
+ WORKFLOW_EXECUTION_UUID,
+ CONCAT('runtimeAttributes:', ra.ATTRIBUTE_NAME),
+ CALL_FQN,
+ IDX,
+ ATTEMPT,
+ ra.ATTRIBUTE_VALUE,
+ 'string',
+ NOW()
+ FROM RUNTIME_ATTRIBUTES ra
+ LEFT JOIN EXECUTION e ON ra.EXECUTION_ID = e.EXECUTION_ID
+ JOIN WORKFLOW_EXECUTION we ON we.WORKFLOW_EXECUTION_ID = e.WORKFLOW_EXECUTION_ID;
diff --git a/database/migration/src/main/resources/changesets/migration/metadata/WorkflowExecutionAuxTableMigration.sql b/database/migration/src/main/resources/changesets/migration/metadata/WorkflowExecutionAuxTableMigration.sql
new file mode 100644
index 000000000..fc16df0b5
--- /dev/null
+++ b/database/migration/src/main/resources/changesets/migration/metadata/WorkflowExecutionAuxTableMigration.sql
@@ -0,0 +1,47 @@
+INSERT INTO METADATA_JOURNAL (
+ WORKFLOW_EXECUTION_UUID,
+ METADATA_KEY,
+ METADATA_VALUE,
+ METADATA_VALUE_TYPE,
+ METADATA_TIMESTAMP
+)
+ SELECT
+ WORKFLOW_EXECUTION_UUID,
+ 'submittedFiles:workflow',
+ wea.WDL_SOURCE,
+ 'string',
+ NOW()
+ FROM WORKFLOW_EXECUTION_AUX wea
+ JOIN WORKFLOW_EXECUTION we ON we.WORKFLOW_EXECUTION_ID = wea.WORKFLOW_EXECUTION_ID;
+
+INSERT INTO METADATA_JOURNAL (
+ WORKFLOW_EXECUTION_UUID,
+ METADATA_KEY,
+ METADATA_VALUE,
+ METADATA_VALUE_TYPE,
+ METADATA_TIMESTAMP
+)
+ SELECT
+ WORKFLOW_EXECUTION_UUID,
+ 'submittedFiles:inputs',
+ wea.JSON_INPUTS,
+ 'string',
+ NOW()
+ FROM WORKFLOW_EXECUTION_AUX wea
+ JOIN WORKFLOW_EXECUTION we ON we.WORKFLOW_EXECUTION_ID = wea.WORKFLOW_EXECUTION_ID;
+
+INSERT INTO METADATA_JOURNAL (
+ WORKFLOW_EXECUTION_UUID,
+ METADATA_KEY,
+ METADATA_VALUE,
+ METADATA_VALUE_TYPE,
+ METADATA_TIMESTAMP
+)
+ SELECT
+ WORKFLOW_EXECUTION_UUID,
+ 'submittedFiles:options',
+ wea.WORKFLOW_OPTIONS,
+ 'string',
+ NOW()
+ FROM WORKFLOW_EXECUTION_AUX wea
+ JOIN WORKFLOW_EXECUTION we ON we.WORKFLOW_EXECUTION_ID = wea.WORKFLOW_EXECUTION_ID;
diff --git a/database/migration/src/main/resources/changesets/standardize_column_names.xml b/database/migration/src/main/resources/changesets/standardize_column_names.xml
new file mode 100644
index 000000000..a309c8ea2
--- /dev/null
+++ b/database/migration/src/main/resources/changesets/standardize_column_names.xml
@@ -0,0 +1,42 @@
+
+
+
+
+
+ Change all Workflow UUID column names to Workflow Execution UUID.
+
+
+
+
+
+
+
+
+ Choose and implement common call/job identifiers.
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/database/migration/src/main/scala/cromwell/database/migration/WdlTransformation.scala b/database/migration/src/main/scala/cromwell/database/migration/WdlTransformation.scala
new file mode 100644
index 000000000..0f8da4898
--- /dev/null
+++ b/database/migration/src/main/scala/cromwell/database/migration/WdlTransformation.scala
@@ -0,0 +1,27 @@
+package cromwell.database.migration
+
+import java.io.{ByteArrayInputStream, IOException}
+import java.util.zip.GZIPInputStream
+
+import org.apache.commons.codec.binary.Base64
+import org.apache.commons.io.IOUtils
+import wdl4s.types.{WdlPrimitiveType, WdlType}
+
+import scala.language.postfixOps
+import scala.util.Try
+
+private [migration] object WdlTransformation {
+
+ def inflate(value: String): Try[String] = {
+ Try {
+ IOUtils.toString(new GZIPInputStream(new ByteArrayInputStream(Base64.decodeBase64(value))))
+ } recover {
+ case e: IOException => value
+ }
+ }
+
+ def coerceStringToWdl(wdlString: String, wdlType: WdlType) = wdlType match {
+ case p: WdlPrimitiveType => p.coerceRawValue(wdlString).get
+ case o => o.fromWdlString(wdlString)
+ }
+}
diff --git a/database/migration/src/main/scala/cromwell/database/migration/metadata/MetadataCustomSql.scala b/database/migration/src/main/scala/cromwell/database/migration/metadata/MetadataCustomSql.scala
new file mode 100644
index 000000000..13515de2e
--- /dev/null
+++ b/database/migration/src/main/scala/cromwell/database/migration/metadata/MetadataCustomSql.scala
@@ -0,0 +1,31 @@
+package cromwell.database.migration.metadata
+
+import java.time.OffsetDateTime
+
+import liquibase.change.custom.CustomSqlChange
+import liquibase.database.Database
+import liquibase.exception.ValidationErrors
+import liquibase.resource.ResourceAccessor
+import liquibase.statement.SqlStatement
+import liquibase.statement.core.RawSqlStatement
+
+object MetadataCustomSql {
+ val Offset = OffsetDateTime.now().getOffset.toString
+}
+
+abstract class MetadataCustomSql extends CustomSqlChange {
+
+ def queries: Array[String]
+
+ override def generateStatements(database: Database): Array[SqlStatement] = {
+ queries map { query => new RawSqlStatement(query) }
+ }
+
+ override def setUp(): Unit = ()
+
+ override def validate(database: Database): ValidationErrors = {
+ new ValidationErrors()
+ }
+
+ override def setFileOpener(resourceAccessor: ResourceAccessor): Unit = ()
+}
diff --git a/database/migration/src/main/scala/cromwell/database/migration/metadata/table/ExecutionTableMigration.scala b/database/migration/src/main/scala/cromwell/database/migration/metadata/table/ExecutionTableMigration.scala
new file mode 100644
index 000000000..2792c00cc
--- /dev/null
+++ b/database/migration/src/main/scala/cromwell/database/migration/metadata/table/ExecutionTableMigration.scala
@@ -0,0 +1,249 @@
+package cromwell.database.migration.metadata.table
+
+import cromwell.database.migration.metadata.MetadataCustomSql
+import MetadataCustomSql._
+
+/**
+ * Transform data from the EXECUTION table to metadata
+ */
+class ExecutionTableMigration extends MetadataCustomSql {
+
+ override def queries: Array[String] = {
+ Array(
+ s"""INSERT INTO METADATA_JOURNAL (
+ WORKFLOW_EXECUTION_UUID,
+ METADATA_KEY,
+ CALL_FQN,
+ JOB_SCATTER_INDEX,
+ JOB_RETRY_ATTEMPT,
+ METADATA_VALUE,
+ METADATA_VALUE_TYPE,
+ METADATA_TIMESTAMP
+ )
+ SELECT
+ WORKFLOW_EXECUTION_UUID,
+ 'start',
+ CALL_FQN,
+ IDX,
+ ATTEMPT,
+ DATE_FORMAT(e.START_DT, '%Y-%m-%dT%T.%f$Offset'),
+ 'string',
+ NOW()
+ FROM TMP_EXECUTION_MIGRATION e
+ JOIN WORKFLOW_EXECUTION we ON we.WORKFLOW_EXECUTION_ID = e.WORKFLOW_EXECUTION_ID
+ WHERE
+ e.START_DT IS NOT NULL;"""
+ ,
+ """INSERT INTO METADATA_JOURNAL (
+ WORKFLOW_EXECUTION_UUID,
+ METADATA_KEY,
+ CALL_FQN,
+ JOB_SCATTER_INDEX,
+ JOB_RETRY_ATTEMPT,
+ METADATA_VALUE,
+ METADATA_VALUE_TYPE,
+ METADATA_TIMESTAMP
+ )
+ SELECT
+ WORKFLOW_EXECUTION_UUID,
+ 'backend',
+ CALL_FQN,
+ IDX,
+ ATTEMPT,
+ e.BACKEND_TYPE,
+ 'string',
+ NOW()
+ FROM TMP_EXECUTION_MIGRATION e
+ JOIN WORKFLOW_EXECUTION we ON we.WORKFLOW_EXECUTION_ID = e.WORKFLOW_EXECUTION_ID;"""
+ ,
+ s"""INSERT INTO METADATA_JOURNAL (
+ WORKFLOW_EXECUTION_UUID,
+ METADATA_KEY,
+ CALL_FQN,
+ JOB_SCATTER_INDEX,
+ JOB_RETRY_ATTEMPT,
+ METADATA_VALUE,
+ METADATA_VALUE_TYPE,
+ METADATA_TIMESTAMP
+ )
+ SELECT
+ WORKFLOW_EXECUTION_UUID,
+ 'end',
+ CALL_FQN,
+ IDX,
+ ATTEMPT,
+ DATE_FORMAT(e.END_DT, '%Y-%m-%dT%T.%f$Offset'),
+ 'string',
+ NOW()
+ FROM TMP_EXECUTION_MIGRATION e
+ JOIN WORKFLOW_EXECUTION we ON we.WORKFLOW_EXECUTION_ID = e.WORKFLOW_EXECUTION_ID
+ WHERE
+ e.END_DT IS NOT NULL;"""
+ ,
+ """INSERT INTO METADATA_JOURNAL (
+ WORKFLOW_EXECUTION_UUID,
+ METADATA_KEY,
+ CALL_FQN,
+ JOB_SCATTER_INDEX,
+ JOB_RETRY_ATTEMPT,
+ METADATA_VALUE,
+ METADATA_VALUE_TYPE,
+ METADATA_TIMESTAMP
+ )
+ SELECT
+ WORKFLOW_EXECUTION_UUID,
+ 'executionStatus',
+ CALL_FQN,
+ IDX,
+ ATTEMPT,
+ e.STATUS,
+ 'string',
+ NOW()
+ FROM TMP_EXECUTION_MIGRATION e
+ JOIN WORKFLOW_EXECUTION we ON we.WORKFLOW_EXECUTION_ID = e.WORKFLOW_EXECUTION_ID;"""
+ ,
+ """INSERT INTO METADATA_JOURNAL (
+ WORKFLOW_EXECUTION_UUID,
+ METADATA_KEY,
+ CALL_FQN,
+ JOB_SCATTER_INDEX,
+ JOB_RETRY_ATTEMPT,
+ METADATA_VALUE,
+ METADATA_VALUE_TYPE,
+ METADATA_TIMESTAMP
+ )
+ SELECT
+ WORKFLOW_EXECUTION_UUID,
+ 'returnCode',
+ CALL_FQN,
+ IDX,
+ ATTEMPT,
+ e.RC,
+ 'int',
+ NOW()
+ FROM TMP_EXECUTION_MIGRATION e
+ JOIN WORKFLOW_EXECUTION we ON we.WORKFLOW_EXECUTION_ID = e.WORKFLOW_EXECUTION_ID
+ WHERE e.RC IS NOT NULL;"""
+ ,
+ s"""INSERT INTO METADATA_JOURNAL (
+ WORKFLOW_EXECUTION_UUID,
+ METADATA_KEY,
+ CALL_FQN,
+ JOB_SCATTER_INDEX,
+ JOB_RETRY_ATTEMPT,
+ METADATA_VALUE,
+ METADATA_VALUE_TYPE,
+ METADATA_TIMESTAMP
+ )
+ SELECT
+ WORKFLOW_EXECUTION_UUID,
+ 'cache:allowResultReuse',
+ CALL_FQN,
+ IDX,
+ ATTEMPT,
+ CASE e.ALLOWS_RESULT_REUSE WHEN 1 THEN 'true' ELSE 'false' END,
+ 'boolean',
+ NOW()
+ FROM TMP_EXECUTION_MIGRATION e
+ JOIN WORKFLOW_EXECUTION we ON we.WORKFLOW_EXECUTION_ID = e.WORKFLOW_EXECUTION_ID;"""
+ ,
+ s"""INSERT INTO METADATA_JOURNAL (
+ WORKFLOW_EXECUTION_UUID,
+ METADATA_KEY,
+ CALL_FQN,
+ JOB_SCATTER_INDEX,
+ JOB_RETRY_ATTEMPT,
+ METADATA_VALUE,
+ METADATA_VALUE_TYPE,
+ METADATA_TIMESTAMP
+ )
+ SELECT
+ WORKFLOW_EXECUTION_UUID,
+ 'preemptible',
+ CALL_FQN,
+ IDX,
+ ATTEMPT,
+ CASE ra.ATTRIBUTE_VALUE >= ATTEMPT WHEN 1 THEN 'true' ELSE 'false' END,
+ 'boolean',
+ NOW()
+ FROM TMP_EXECUTION_MIGRATION e
+ JOIN WORKFLOW_EXECUTION we ON we.WORKFLOW_EXECUTION_ID = e.WORKFLOW_EXECUTION_ID
+ LEFT JOIN RUNTIME_ATTRIBUTES ra ON e.EXECUTION_ID = ra.EXECUTION_ID AND ra.ATTRIBUTE_NAME = 'preemptible'
+ WHERE ra.ATTRIBUTE_VALUE IS NOT NULL;"""
+ ,
+ """INSERT INTO METADATA_JOURNAL (
+ WORKFLOW_EXECUTION_UUID,
+ METADATA_KEY,
+ CALL_FQN,
+ JOB_SCATTER_INDEX,
+ JOB_RETRY_ATTEMPT,
+ METADATA_TIMESTAMP
+ )
+ SELECT
+ WORKFLOW_EXECUTION_UUID,
+ 'runtimeAttributes',
+ CALL_FQN,
+ IDX,
+ ATTEMPT,
+ '1900-01-01'
+ FROM TMP_EXECUTION_MIGRATION e
+ JOIN WORKFLOW_EXECUTION we ON we.WORKFLOW_EXECUTION_ID = e.WORKFLOW_EXECUTION_ID;"""
+ ,
+ """INSERT INTO METADATA_JOURNAL (
+ WORKFLOW_EXECUTION_UUID,
+ METADATA_KEY,
+ CALL_FQN,
+ JOB_SCATTER_INDEX,
+ JOB_RETRY_ATTEMPT,
+ METADATA_TIMESTAMP
+ )
+ SELECT
+ WORKFLOW_EXECUTION_UUID,
+ 'inputs',
+ CALL_FQN,
+ IDX,
+ ATTEMPT,
+ '1900-01-01'
+ FROM TMP_EXECUTION_MIGRATION e
+ JOIN WORKFLOW_EXECUTION we ON we.WORKFLOW_EXECUTION_ID = e.WORKFLOW_EXECUTION_ID;"""
+ ,
+ """INSERT INTO METADATA_JOURNAL (
+ WORKFLOW_EXECUTION_UUID,
+ METADATA_KEY,
+ CALL_FQN,
+ JOB_SCATTER_INDEX,
+ JOB_RETRY_ATTEMPT,
+ METADATA_TIMESTAMP
+ )
+ SELECT
+ WORKFLOW_EXECUTION_UUID,
+ 'outputs',
+ CALL_FQN,
+ IDX,
+ ATTEMPT,
+ '1900-01-01'
+ FROM TMP_EXECUTION_MIGRATION e
+ JOIN WORKFLOW_EXECUTION we ON we.WORKFLOW_EXECUTION_ID = e.WORKFLOW_EXECUTION_ID;"""
+ ,
+ """INSERT INTO METADATA_JOURNAL (
+ WORKFLOW_EXECUTION_UUID,
+ METADATA_KEY,
+ CALL_FQN,
+ JOB_SCATTER_INDEX,
+ JOB_RETRY_ATTEMPT,
+ METADATA_TIMESTAMP
+ )
+ SELECT
+ WORKFLOW_EXECUTION_UUID,
+ 'executionEvents[]',
+ CALL_FQN,
+ IDX,
+ ATTEMPT,
+ '1900-01-01'
+ FROM TMP_EXECUTION_MIGRATION e
+ JOIN WORKFLOW_EXECUTION we ON we.WORKFLOW_EXECUTION_ID = e.WORKFLOW_EXECUTION_ID;"""
+ )
+ }
+
+ override def getConfirmationMessage: String = "Execution Table migration complete."
+}
diff --git a/database/migration/src/main/scala/cromwell/database/migration/metadata/table/FailureEventTableMigration.scala b/database/migration/src/main/scala/cromwell/database/migration/metadata/table/FailureEventTableMigration.scala
new file mode 100644
index 000000000..60d12342e
--- /dev/null
+++ b/database/migration/src/main/scala/cromwell/database/migration/metadata/table/FailureEventTableMigration.scala
@@ -0,0 +1,62 @@
+package cromwell.database.migration.metadata.table
+
+import cromwell.database.migration.metadata.MetadataCustomSql
+
+class FailureEventTableMigration extends MetadataCustomSql {
+ import MetadataCustomSql._
+
+ override def queries: Array[String] = {
+ Array(
+ """
+ |INSERT INTO METADATA_JOURNAL (
+ | WORKFLOW_EXECUTION_UUID,
+ | METADATA_KEY,
+ | CALL_FQN,
+ | JOB_SCATTER_INDEX,
+ | JOB_RETRY_ATTEMPT,
+ | METADATA_VALUE,
+ | METADATA_VALUE_TYPE,
+ | METADATA_TIMESTAMP
+ |)
+ |SELECT
+ | WORKFLOW_EXECUTION_UUID,
+ | CONCAT('failures[', fe.FAILURE_EVENT_ID ,']:failure'),
+ | CALL_FQN,
+ | IDX,
+ | ATTEMPT,
+ | fe.EVENT_MESSAGE,
+ | 'string',
+ | NOW()
+ |FROM FAILURE_EVENT fe
+ | LEFT JOIN EXECUTION e ON fe.EXECUTION_ID = e.EXECUTION_ID
+ | JOIN WORKFLOW_EXECUTION we ON we.WORKFLOW_EXECUTION_ID = e.WORKFLOW_EXECUTION_ID;
+ """.stripMargin,
+ s"""
+ |INSERT INTO METADATA_JOURNAL (
+ | WORKFLOW_EXECUTION_UUID,
+ | METADATA_KEY,
+ | CALL_FQN,
+ | JOB_SCATTER_INDEX,
+ | JOB_RETRY_ATTEMPT,
+ | METADATA_VALUE,
+ | METADATA_VALUE_TYPE,
+ | METADATA_TIMESTAMP
+ |)
+ |SELECT
+ | WORKFLOW_EXECUTION_UUID,
+ | CONCAT('failures[', fe.FAILURE_EVENT_ID ,']:timestamp'),
+ | CALL_FQN,
+ | IDX,
+ | ATTEMPT,
+ | DATE_FORMAT(fe.EVENT_TIMESTAMP, '%Y-%m-%dT%T.%f$Offset'),
+ | 'string',
+ | NOW()
+ |FROM FAILURE_EVENT fe
+ | LEFT JOIN EXECUTION e ON fe.EXECUTION_ID = e.EXECUTION_ID
+ | JOIN WORKFLOW_EXECUTION we ON we.WORKFLOW_EXECUTION_ID = e.WORKFLOW_EXECUTION_ID;
+ """.stripMargin
+ )
+ }
+
+ override def getConfirmationMessage: String = "Failure Table migration complete."
+}
diff --git a/database/migration/src/main/scala/cromwell/database/migration/metadata/table/executionevent/ExecutionEventTableDescriptionMigration.scala b/database/migration/src/main/scala/cromwell/database/migration/metadata/table/executionevent/ExecutionEventTableDescriptionMigration.scala
new file mode 100644
index 000000000..8ee97fcc9
--- /dev/null
+++ b/database/migration/src/main/scala/cromwell/database/migration/metadata/table/executionevent/ExecutionEventTableDescriptionMigration.scala
@@ -0,0 +1,37 @@
+package cromwell.database.migration.metadata.table.executionevent
+
+import cromwell.database.migration.metadata.MetadataCustomSql
+
+class ExecutionEventTableDescriptionMigration extends MetadataCustomSql {
+
+ override def queries: Array[String] = {
+ Array(
+ """
+ |INSERT INTO METADATA_JOURNAL (
+ | WORKFLOW_EXECUTION_UUID,
+ | METADATA_KEY,
+ | CALL_FQN,
+ | JOB_SCATTER_INDEX,
+ | JOB_RETRY_ATTEMPT,
+ | METADATA_VALUE,
+ | METADATA_VALUE_TYPE,
+ | METADATA_TIMESTAMP
+ |)
+ | SELECT
+ | WORKFLOW_EXECUTION_UUID,
+ | CONCAT('executionEvents[', ev.EVENT_ID ,']:description'),
+ | CALL_FQN,
+ | IDX,
+ | ATTEMPT,
+ | ev.DESCRIPTION,
+ | 'string',
+ | NOW()
+ | FROM EXECUTION_EVENT ev
+ | LEFT JOIN EXECUTION e ON ev.EXECUTION_ID = e.EXECUTION_ID
+ | JOIN WORKFLOW_EXECUTION we ON we.WORKFLOW_EXECUTION_ID = e.WORKFLOW_EXECUTION_ID;
+ """.stripMargin
+ )
+ }
+
+ override def getConfirmationMessage: String = "Execution Event Table (Description field) migration complete."
+}
diff --git a/database/migration/src/main/scala/cromwell/database/migration/metadata/table/executionevent/ExecutionEventTableEndMigration.scala b/database/migration/src/main/scala/cromwell/database/migration/metadata/table/executionevent/ExecutionEventTableEndMigration.scala
new file mode 100644
index 000000000..d3d80e74f
--- /dev/null
+++ b/database/migration/src/main/scala/cromwell/database/migration/metadata/table/executionevent/ExecutionEventTableEndMigration.scala
@@ -0,0 +1,38 @@
+package cromwell.database.migration.metadata.table.executionevent
+
+import cromwell.database.migration.metadata.MetadataCustomSql
+import MetadataCustomSql._
+
+class ExecutionEventTableEndMigration extends MetadataCustomSql {
+
+ override def queries: Array[String] = {
+ Array(
+ s"""
+ |INSERT INTO METADATA_JOURNAL (
+ | WORKFLOW_EXECUTION_UUID,
+ | METADATA_KEY,
+ | CALL_FQN,
+ | JOB_SCATTER_INDEX,
+ | JOB_RETRY_ATTEMPT,
+ | METADATA_VALUE,
+ | METADATA_VALUE_TYPE,
+ | METADATA_TIMESTAMP
+ |)
+ |SELECT
+ | WORKFLOW_EXECUTION_UUID,
+ | CONCAT('executionEvents[', ev.EVENT_ID ,']:endTime'),
+ | CALL_FQN,
+ | IDX,
+ | ATTEMPT,
+ | DATE_FORMAT(ev.END_DT, '%Y-%m-%dT%T.%f$Offset'),
+ | 'string',
+ | NOW()
+ |FROM EXECUTION_EVENT ev
+ | LEFT JOIN EXECUTION e ON ev.EXECUTION_ID = e.EXECUTION_ID
+ | JOIN WORKFLOW_EXECUTION we ON we.WORKFLOW_EXECUTION_ID = e.WORKFLOW_EXECUTION_ID;
+ """.stripMargin
+ )
+ }
+
+ override def getConfirmationMessage: String = "Execution Event Table (End field) migration complete."
+}
diff --git a/database/migration/src/main/scala/cromwell/database/migration/metadata/table/executionevent/ExecutionEventTableStartMigration.scala b/database/migration/src/main/scala/cromwell/database/migration/metadata/table/executionevent/ExecutionEventTableStartMigration.scala
new file mode 100644
index 000000000..e0c9b4184
--- /dev/null
+++ b/database/migration/src/main/scala/cromwell/database/migration/metadata/table/executionevent/ExecutionEventTableStartMigration.scala
@@ -0,0 +1,37 @@
+package cromwell.database.migration.metadata.table.executionevent
+
+import cromwell.database.migration.metadata.MetadataCustomSql
+import MetadataCustomSql._
+
+class ExecutionEventTableStartMigration extends MetadataCustomSql {
+
+ override def queries: Array[String] = {
+ Array(
+ s"""
+ |INSERT INTO METADATA_JOURNAL (
+ | WORKFLOW_EXECUTION_UUID,
+ | METADATA_KEY,
+ | CALL_FQN,
+ | JOB_SCATTER_INDEX,
+ | JOB_RETRY_ATTEMPT,
+ | METADATA_VALUE,
+ | METADATA_VALUE_TYPE,
+ | METADATA_TIMESTAMP
+ |)
+ |SELECT
+ | WORKFLOW_EXECUTION_UUID,
+ | CONCAT('executionEvents[', ev.EVENT_ID ,']:startTime'),
+ | CALL_FQN,
+ | IDX,
+ | ATTEMPT,
+ | DATE_FORMAT(ev.START_DT, '%Y-%m-%dT%T.%f$Offset'),
+ | 'string',
+ | NOW()
+ |FROM EXECUTION_EVENT ev
+ | LEFT JOIN EXECUTION e ON ev.EXECUTION_ID = e.EXECUTION_ID
+ | JOIN WORKFLOW_EXECUTION we ON we.WORKFLOW_EXECUTION_ID = e.WORKFLOW_EXECUTION_ID;
+ """.stripMargin)
+ }
+
+ override def getConfirmationMessage: String = "Execution Event Table (Start field) migration complete."
+}
diff --git a/database/migration/src/main/scala/cromwell/database/migration/metadata/table/symbol/CallOutputSymbolTableMigration.scala b/database/migration/src/main/scala/cromwell/database/migration/metadata/table/symbol/CallOutputSymbolTableMigration.scala
new file mode 100644
index 000000000..fbd5529c1
--- /dev/null
+++ b/database/migration/src/main/scala/cromwell/database/migration/metadata/table/symbol/CallOutputSymbolTableMigration.scala
@@ -0,0 +1,34 @@
+package cromwell.database.migration.metadata.table.symbol
+
+import java.sql.PreparedStatement
+
+import wdl4s.values._
+
+class CallOutputSymbolTableMigration extends SymbolTableMigration {
+ override def processSymbol(statement: PreparedStatement,
+ rowIndex: Int,
+ workflowUuid: String,
+ symbolName: String,
+ symbolScope: String,
+ symbolIndex: Option[Int],
+ symbolAttempt: Option[Int],
+ wdlValue: WdlValue): Int = {
+
+ (symbolIndex, symbolAttempt) match {
+ case (Some(index), Some(attempt)) =>
+ val metadataStatementForCall = new MetadataStatementForCall(statement,
+ workflowUuid,
+ symbolScope,
+ index,
+ attempt
+ )
+
+ addWdlValue(s"outputs:$symbolName", wdlValue, metadataStatementForCall)
+ case _ =>
+ logger.warn(s"Found output without index or attempt: [$workflowUuid] $symbolScope - $symbolName:$symbolIndex:$symbolAttempt")
+ 0
+ }
+ }
+
+ override def getConfirmationMessage: String = "Call outputs from Symbol Table migration complete."
+}
diff --git a/database/migration/src/main/scala/cromwell/database/migration/metadata/table/symbol/InputSymbolTableMigration.scala b/database/migration/src/main/scala/cromwell/database/migration/metadata/table/symbol/InputSymbolTableMigration.scala
new file mode 100644
index 000000000..81756d017
--- /dev/null
+++ b/database/migration/src/main/scala/cromwell/database/migration/metadata/table/symbol/InputSymbolTableMigration.scala
@@ -0,0 +1,38 @@
+package cromwell.database.migration.metadata.table.symbol
+
+import java.sql.PreparedStatement
+
+import wdl4s.values._
+
+class InputSymbolTableMigration extends SymbolTableMigration {
+
+ override def processSymbol(statement: PreparedStatement,
+ rowIndex: Int,
+ workflowUuid: String,
+ symbolName: String,
+ symbolScope: String,
+ symbolIndex: Option[Int],
+ symbolAttempt: Option[Int],
+ wdlValue: WdlValue): Int = {
+
+ (symbolIndex, symbolAttempt) match {
+ case (Some(index) , Some(attempt)) =>
+ // Call scoped
+ val metadataStatementForCall = new MetadataStatementForCall(statement,
+ workflowUuid,
+ symbolScope,
+ index,
+ attempt
+ )
+
+ addWdlValue(s"inputs:$symbolName", wdlValue, metadataStatementForCall)
+ case (None, None) if !symbolScope.contains('.') =>
+ val metadataStatementForWorkflow = new MetadataStatementForWorkflow(statement, workflowUuid)
+ addWdlValue(s"inputs:$symbolScope.$symbolName", wdlValue, metadataStatementForWorkflow)
+ case _ =>
+ 0
+ }
+ }
+
+ override def getConfirmationMessage: String = "Inputs from Symbol Table migration complete."
+}
diff --git a/database/migration/src/main/scala/cromwell/database/migration/metadata/table/symbol/MetadataStatement.scala b/database/migration/src/main/scala/cromwell/database/migration/metadata/table/symbol/MetadataStatement.scala
new file mode 100644
index 000000000..a938ebcdf
--- /dev/null
+++ b/database/migration/src/main/scala/cromwell/database/migration/metadata/table/symbol/MetadataStatement.scala
@@ -0,0 +1,117 @@
+package cromwell.database.migration.metadata.table.symbol
+
+import java.sql.{PreparedStatement, Timestamp, Types}
+import java.time.format.DateTimeFormatter
+import java.time.{OffsetDateTime, ZoneId, ZoneOffset}
+
+import liquibase.database.jvm.JdbcConnection
+import org.slf4j.LoggerFactory
+import wdl4s.values.{WdlBoolean, WdlFloat, WdlInteger, WdlValue}
+
+object MetadataStatement {
+ val WorkflowIdIdx = 1
+ val KeyIdx = 2
+ val CallFqnIdx = 3
+ val CallIndexIdx = 4
+ val CallAttemptIdx = 5
+ val ValueIdx = 6
+ val TimestampIdx = 7
+ val ValueTypeIdx = 8
+
+ def makeStatement(connection: JdbcConnection): PreparedStatement = connection.prepareStatement(
+ """
+ |INSERT INTO METADATA_JOURNAL
+ |(WORKFLOW_EXECUTION_UUID, METADATA_KEY, CALL_FQN, JOB_SCATTER_INDEX, JOB_RETRY_ATTEMPT, METADATA_VALUE, METADATA_TIMESTAMP, METADATA_VALUE_TYPE)
+ |VALUES (?, ?, ?, ?, ?, ?, ?, ?)
+ """.stripMargin)
+
+ implicit class OffsetDateTimeToSystemTimestamp(val offsetDateTime: OffsetDateTime) extends AnyVal {
+ def toSystemTimestamp = Timestamp.valueOf(offsetDateTime.atZoneSameInstant(ZoneId.systemDefault).toLocalDateTime)
+ }
+}
+
+trait MetadataStatement {
+ def addKeyValue(key: String, value: Any): Unit
+ def addEmptyValue(key: String): Unit
+}
+
+class MetadataStatementForWorkflow(preparedStatement: PreparedStatement, workflowId: String) extends MetadataStatement {
+ import MetadataStatement._
+
+ val offsetDateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSZZZZZ")
+ val logger = LoggerFactory.getLogger("LiquibaseMetadataMigration")
+ val dawn = OffsetDateTime.of(0, 1, 1, 0, 0, 0, 0, ZoneOffset.UTC).toSystemTimestamp
+ var batchSizeCounter: Int = 0
+
+ private def metadataType(value: Any) = {
+ value match {
+ case WdlInteger(i) => "int"
+ case WdlFloat(f) => "number"
+ case WdlBoolean(b) => "boolean"
+ case value: WdlValue => "string"
+ case _: Int | Long => "int"
+ case _: Double | Float => "number"
+ case _: Boolean => "boolean"
+ case _ =>"string"
+ }
+ }
+
+ private def metadataValue(value: Any) = {
+ value match {
+ case v: WdlValue => v.valueString
+ case v => v.toString
+ }
+ }
+
+ protected def setStatement() = {
+ preparedStatement.setString(MetadataStatement.WorkflowIdIdx, workflowId)
+ preparedStatement.setNull(MetadataStatement.CallFqnIdx, Types.VARCHAR)
+ preparedStatement.setNull(MetadataStatement.CallIndexIdx, Types.INTEGER)
+ preparedStatement.setNull(MetadataStatement.CallAttemptIdx, Types.INTEGER)
+ }
+
+ protected def addDataAndBatch(key: String, value: Any) = {
+ preparedStatement.setString(MetadataStatement.KeyIdx, key)
+
+ // Set the value and type
+ value match {
+ case null =>
+ preparedStatement.setNull(MetadataStatement.ValueIdx, Types.VARCHAR)
+ preparedStatement.setNull(MetadataStatement.ValueTypeIdx, Types.VARCHAR) // Null values have null type
+ case _ =>
+ preparedStatement.setString(MetadataStatement.ValueIdx, metadataValue(value))
+ preparedStatement.setString(MetadataStatement.ValueTypeIdx, metadataType(value))
+ }
+
+ preparedStatement.addBatch()
+ }
+
+ private def add(key: String, value: Any, errorMessage: String) = try {
+ setStatement()
+ addDataAndBatch(key, value)
+ } catch {
+ case t: Throwable => logger.warn(errorMessage, t)
+ }
+
+ /** Adds a non-null value to the metadata journal. */
+ override def addKeyValue(key: String, value: Any) = {
+ if (value != null) {
+ preparedStatement.setTimestamp(MetadataStatement.TimestampIdx, OffsetDateTime.now().toSystemTimestamp)
+ add(key, value, s"Failed to migrate metadata value $value with key $key for workflow $workflowId")
+ }
+ }
+
+ override def addEmptyValue(key: String): Unit = {
+ preparedStatement.setTimestamp(MetadataStatement.TimestampIdx, dawn)
+ add(key, null, s"Failed to add empty value with key $key for workflow $workflowId")
+ }
+}
+
+class MetadataStatementForCall(preparedStatement: PreparedStatement, workflowId: String, callFqn: String, index: Int, attempt: Int) extends MetadataStatementForWorkflow(preparedStatement, workflowId) {
+ override def setStatement() = {
+ preparedStatement.setString(MetadataStatement.WorkflowIdIdx, workflowId)
+ preparedStatement.setString(MetadataStatement.CallFqnIdx, callFqn)
+ preparedStatement.setInt(MetadataStatement.CallIndexIdx, index)
+ preparedStatement.setInt(MetadataStatement.CallAttemptIdx, attempt)
+ }
+}
\ No newline at end of file
diff --git a/database/migration/src/main/scala/cromwell/database/migration/metadata/table/symbol/QueryPaginator.scala b/database/migration/src/main/scala/cromwell/database/migration/metadata/table/symbol/QueryPaginator.scala
new file mode 100644
index 000000000..e0d90d3d4
--- /dev/null
+++ b/database/migration/src/main/scala/cromwell/database/migration/metadata/table/symbol/QueryPaginator.scala
@@ -0,0 +1,21 @@
+package cromwell.database.migration.metadata.table.symbol
+
+import java.sql.{PreparedStatement, ResultSet}
+
+import liquibase.database.jvm.JdbcConnection
+
+class QueryPaginator(statement: PreparedStatement,
+ batchSize: Int,
+ count: Int) extends Iterator[ResultSet] {
+ var cursor = 0
+
+ def next(): ResultSet = {
+ statement.setInt(1, cursor)
+ statement.setInt(2, cursor + batchSize)
+
+ cursor += batchSize
+ statement.executeQuery()
+ }
+
+ def hasNext(): Boolean = cursor <= count
+}
diff --git a/database/migration/src/main/scala/cromwell/database/migration/metadata/table/symbol/ResultSetIterator.scala b/database/migration/src/main/scala/cromwell/database/migration/metadata/table/symbol/ResultSetIterator.scala
new file mode 100644
index 000000000..8658a59da
--- /dev/null
+++ b/database/migration/src/main/scala/cromwell/database/migration/metadata/table/symbol/ResultSetIterator.scala
@@ -0,0 +1,8 @@
+package cromwell.database.migration.metadata.table.symbol
+
+import java.sql.ResultSet
+
+class ResultSetIterator(rs: ResultSet) extends Iterator[ResultSet] {
+ def hasNext: Boolean = rs.next()
+ def next(): ResultSet = rs
+}
diff --git a/database/migration/src/main/scala/cromwell/database/migration/metadata/table/symbol/SymbolTableMigration.scala b/database/migration/src/main/scala/cromwell/database/migration/metadata/table/symbol/SymbolTableMigration.scala
new file mode 100644
index 000000000..b1439430d
--- /dev/null
+++ b/database/migration/src/main/scala/cromwell/database/migration/metadata/table/symbol/SymbolTableMigration.scala
@@ -0,0 +1,195 @@
+package cromwell.database.migration.metadata.table.symbol
+
+import java.sql.{PreparedStatement, ResultSet}
+
+import com.typesafe.config.ConfigFactory
+import cromwell.core.simpleton.WdlValueSimpleton._
+import cromwell.database.core.SqlConfiguration
+import cromwell.database.migration.WdlTransformation
+import lenthall.config.ScalaConfig._
+import liquibase.change.custom.CustomTaskChange
+import liquibase.database.Database
+import liquibase.database.jvm.JdbcConnection
+import liquibase.exception.{CustomChangeException, ValidationErrors}
+import liquibase.resource.ResourceAccessor
+import org.slf4j.LoggerFactory
+import wdl4s.WdlExpression
+import wdl4s.types.WdlType
+import wdl4s.values.WdlValue
+
+import scala.language.postfixOps
+import scala.util.{Failure, Success, Try}
+
+object SymbolTableMigration {
+ val NbRowsQuery =
+ """
+ |SELECT MAX(TMP_SYMBOL_ID) AS symbol_count
+ |FROM TMP_SYMBOL;
+ """.stripMargin
+}
+
+trait SymbolTableMigration extends CustomTaskChange {
+ import SymbolTableMigration._
+ import cromwell.database.migration.WdlTransformation._
+
+ // Nb of rows to retrieve / process in a batch
+ val config = SqlConfiguration.defaultDatabaseConfig
+
+ /**
+ * Specify the size of a "page".
+ * For databases with a very large number of symbols, selecting all the rows at once can generate a variety of problems.
+ * In order to avoid any issue, the selection is paginated. This value sets how many rows should be retrieved and processed at a time, before asking for the next chunk.
+ */
+ val readBatchSize = config.getIntOr("migration.read-batch-size", 100000)
+
+ /**
+ * Because a symbol row can contain any arbitrary wdl value, the amount of metadata rows to insert from a single symbol row can vary from 1 to several thousands (or more).
+ * To keep the size of the insert batch from growing out of control we monitor its size and execute/commit when it reaches or exceeds writeBatchSize.
+ */
+ val writeBatchSize = config.getIntOr("migration.write-batch-size", 100000)
+ val logger = LoggerFactory.getLogger("LiquibaseMetadataMigration")
+
+
+
+ override def execute(database: Database): Unit = {
+ try {
+ val dbConn = database.getConnection.asInstanceOf[JdbcConnection]
+ val autoCommit = dbConn.getAutoCommit
+ dbConn.setAutoCommit(false)
+ migrate(dbConn)
+ dbConn.setAutoCommit(autoCommit)
+ } catch {
+ case t: CustomChangeException => throw t
+ case t: Throwable => throw new CustomChangeException(s"Could not apply migration script for metadata at ${getClass.getSimpleName}", t)
+ }
+ }
+
+ def tmpSymbolPaginatedStatement(connection: JdbcConnection): PreparedStatement = connection.prepareStatement("""
+ |SELECT
+ | WORKFLOW_EXECUTION_UUID,
+ | SYMBOL_NAME,
+ | SYMBOL_SCOPE,
+ | SYMBOL_INDEX,
+ | SYMBOL_ATTEMPT,
+ | WDL_TYPE,
+ | WDL_VALUE
+ | FROM TMP_SYMBOL
+ | WHERE TMP_SYMBOL_ID >= ? AND TMP_SYMBOL_ID < ?;
+ """.stripMargin)
+
+ private def migrate(connection: JdbcConnection) = {
+ logger.info(s"Running migration with a read batch size of $readBatchSize and a write batch size of $writeBatchSize")
+
+ /**
+ * Keep count of the size of the batch.
+ *
+ * @see writeBatchSize
+ */
+ var insertsCounter: Int = 0
+
+ // Find the max row id in the TMP_SYMBOL table
+ val tmpSymbolCountRS = connection.createStatement().executeQuery(NbRowsQuery)
+
+ if (tmpSymbolCountRS.next()) {
+ val tmpSymbolCount = tmpSymbolCountRS.getInt("symbol_count")
+
+ // So we can display progress
+ val nbPages = Math.max(tmpSymbolCount / readBatchSize, 1)
+
+ val paginator = new QueryPaginator(tmpSymbolPaginatedStatement(connection), readBatchSize, tmpSymbolCount)
+ val metadataInsertStatement = MetadataStatement.makeStatement(connection)
+
+ // Loop over pages
+ paginator.zipWithIndex foreach {
+ case (resultBatch, page) =>
+ // Loop over rows in page
+ new ResultSetIterator(resultBatch).zipWithIndex foreach {
+ case (row, idx) =>
+ insertsCounter += migrateRow(connection, metadataInsertStatement, row, idx)
+ // insertsCounter can actually be bigger than writeBatchSize as wdlValues are processed atomically, so this is a best effort
+ if (insertsCounter >= writeBatchSize) {
+ metadataInsertStatement.executeBatch()
+ connection.commit()
+ insertsCounter = 0
+ }
+ }
+
+ resultBatch.close()
+
+ val progress = Math.min((page + 1) * 100 / nbPages, 100)
+ logger.info(s"[${getClass.getSimpleName}] $progress%")
+ }
+
+ if (insertsCounter != 0) {
+ metadataInsertStatement.executeBatch()
+ connection.commit()
+ }
+ } else {
+ throw new CustomChangeException("Could not find max value of symbol id for pagination")
+ }
+ }
+
+ /**
+ * Migrate a row to the metadata table
+ */
+ protected def migrateRow(connection: JdbcConnection, statement: PreparedStatement, row: ResultSet, idx: Int): Int = {
+ // Try to coerce the value to a WdlValue
+ val value = for {
+ wdlType <- Try(WdlType.fromWdlString(row.getString("WDL_TYPE")))
+ inflated <- row.getString("WDL_VALUE") match {
+ case null => Success("") // Empty Strings are null in the DB
+ case nonNull => inflate(row.getString("WDL_VALUE"))
+ }
+ } yield WdlTransformation.coerceStringToWdl(inflated, wdlType)
+
+ val workflowUuid = row.getString("WORKFLOW_EXECUTION_UUID")
+ val symbolName = row.getString("SYMBOL_NAME")
+ val symbolScope = row.getString("SYMBOL_SCOPE")
+ // getInt returns 0 if value is null so use getString instead and convert after
+ val symbolIndex = Option(row.getString("SYMBOL_INDEX")) map { _.toInt }
+ val symbolAttempt = Option(row.getString("SYMBOL_ATTEMPT")) map { _.toInt }
+
+ value match {
+ case Success(wdlValue) =>
+ processSymbol(statement, idx, workflowUuid, symbolName, symbolScope, symbolIndex, symbolAttempt, wdlValue)
+ case Failure(f) =>
+ logger.error(
+ s"""Could not parse symbol of type ${row.getString("WDL_TYPE")}
+ |for Workflow $workflowUuid - Call $symbolScope:$symbolIndex""".stripMargin, f)
+ 0
+ }
+ }
+
+ def processSymbol(statement: PreparedStatement,
+ idx: Int,
+ workflowUuid: String,
+ symbolName: String,
+ symbolScope: String,
+ symbolIndex: Option[Int],
+ symbolAttempt: Option[Int],
+ wdlValue: WdlValue): Int
+
+ override def setUp(): Unit = ()
+
+ override def validate(database: Database): ValidationErrors = new ValidationErrors
+
+ override def setFileOpener(resourceAccessor: ResourceAccessor): Unit = {}
+
+ /**
+ * Add all necessary statements to the batch for the provided WdlValue.
+ */
+ protected final def addWdlValue(metadataKey: String, wdlValue: WdlValue, metadataStatementForCall: MetadataStatement): Int = {
+ wdlValue match {
+ // simplify doesn't handle WdlExpression
+ case expr: WdlExpression =>
+ metadataStatementForCall.addKeyValue(metadataKey, expr.valueString)
+ 1
+ case value =>
+ val simplified = value.simplify(metadataKey)
+ simplified foreach { simpleton =>
+ metadataStatementForCall.addKeyValue(simpleton.simpletonKey, simpleton.simpletonValue)
+ }
+ simplified.size
+ }
+ }
+}
diff --git a/database/migration/src/main/scala/cromwell/database/migration/metadata/table/symbol/WorkflowOutputSymbolTableMigration.scala b/database/migration/src/main/scala/cromwell/database/migration/metadata/table/symbol/WorkflowOutputSymbolTableMigration.scala
new file mode 100644
index 000000000..f188fc0cd
--- /dev/null
+++ b/database/migration/src/main/scala/cromwell/database/migration/metadata/table/symbol/WorkflowOutputSymbolTableMigration.scala
@@ -0,0 +1,22 @@
+package cromwell.database.migration.metadata.table.symbol
+
+import java.sql.PreparedStatement
+
+import wdl4s.values._
+
+class WorkflowOutputSymbolTableMigration extends SymbolTableMigration {
+
+ override def processSymbol(statement: PreparedStatement,
+ rowIndex: Int,
+ workflowUuid: String,
+ symbolName: String,
+ symbolScope: String,
+ symbolIndex: Option[Int],
+ symbolAttempt: Option[Int],
+ wdlValue: WdlValue): Int = {
+ val metadataStatementForWorkflow = new MetadataStatementForWorkflow(statement, workflowUuid)
+ addWdlValue(s"outputs:$symbolScope.$symbolName", wdlValue, metadataStatementForWorkflow)
+ }
+
+ override def getConfirmationMessage: String = "Workflow outputs from Symbol Table migration complete."
+}
diff --git a/database/migration/src/main/scala/cromwell/database/migration/metadata/table/workflowexecution/WorkflowExecutionTableMigration.scala b/database/migration/src/main/scala/cromwell/database/migration/metadata/table/workflowexecution/WorkflowExecutionTableMigration.scala
new file mode 100644
index 000000000..411f596e9
--- /dev/null
+++ b/database/migration/src/main/scala/cromwell/database/migration/metadata/table/workflowexecution/WorkflowExecutionTableMigration.scala
@@ -0,0 +1,112 @@
+package cromwell.database.migration.metadata.table.workflowexecution
+
+import cromwell.database.migration.metadata.MetadataCustomSql
+import MetadataCustomSql._
+
+class WorkflowExecutionTableMigration extends MetadataCustomSql {
+
+ override def queries: Array[String] = {
+ Array(
+ s"""
+ |INSERT INTO METADATA_JOURNAL (
+ |WORKFLOW_EXECUTION_UUID,
+ |METADATA_KEY,
+ |METADATA_VALUE,
+ |METADATA_VALUE_TYPE,
+ |METADATA_TIMESTAMP
+ |)
+ |SELECT
+ | WORKFLOW_EXECUTION_UUID,
+ | 'submission',
+ | DATE_FORMAT(START_DT, '%Y-%m-%dT%T.%f$Offset'),
+ | 'string',
+ | NOW()
+ |FROM WORKFLOW_EXECUTION
+ |WHERE START_DT IS NOT NULL;""".stripMargin
+ ,
+ s"""
+ |INSERT INTO METADATA_JOURNAL (
+ |WORKFLOW_EXECUTION_UUID,
+ |METADATA_KEY,
+ |METADATA_VALUE,
+ |METADATA_VALUE_TYPE,
+ |METADATA_TIMESTAMP
+ |)
+ |SELECT
+ | WORKFLOW_EXECUTION_UUID,
+ | 'start',
+ | DATE_FORMAT(START_DT, '%Y-%m-%dT%T.%f$Offset'),
+ | 'string',
+ | NOW()
+ |FROM WORKFLOW_EXECUTION
+ |WHERE START_DT IS NOT NULL;""".stripMargin
+ ,
+ s"""
+ |INSERT INTO METADATA_JOURNAL (
+ |WORKFLOW_EXECUTION_UUID,
+ |METADATA_KEY,
+ |METADATA_VALUE,
+ |METADATA_VALUE_TYPE,
+ |METADATA_TIMESTAMP
+ |)
+ |SELECT
+ | WORKFLOW_EXECUTION_UUID,
+ | 'end',
+ | DATE_FORMAT(END_DT, '%Y-%m-%dT%T.%f$Offset'),
+ | 'string',
+ | NOW()
+ |FROM WORKFLOW_EXECUTION
+ |WHERE END_DT IS NOT NULL;""".stripMargin
+ ,
+ s"""
+ |INSERT INTO METADATA_JOURNAL (
+ |WORKFLOW_EXECUTION_UUID,
+ |METADATA_KEY,
+ |METADATA_VALUE,
+ |METADATA_VALUE_TYPE,
+ |METADATA_TIMESTAMP
+ |)
+ |SELECT
+ | WORKFLOW_EXECUTION_UUID,
+ | 'status',
+ | STATUS,
+ | 'string',
+ | NOW()
+ |FROM WORKFLOW_EXECUTION;""".stripMargin
+ ,
+ """
+ |INSERT INTO METADATA_JOURNAL (
+ |WORKFLOW_EXECUTION_UUID,
+ |METADATA_KEY,
+ |METADATA_VALUE,
+ |METADATA_VALUE_TYPE,
+ |METADATA_TIMESTAMP
+ |)
+ |SELECT
+ | WORKFLOW_EXECUTION_UUID,
+ | 'workflowName',
+ | WORKFLOW_NAME,
+ | 'string',
+ | NOW()
+ |FROM WORKFLOW_EXECUTION;""".stripMargin
+ ,
+ """
+ |INSERT INTO METADATA_JOURNAL (
+ |WORKFLOW_EXECUTION_UUID,
+ |METADATA_KEY,
+ |METADATA_VALUE,
+ |METADATA_VALUE_TYPE,
+ |METADATA_TIMESTAMP
+ |)
+ |SELECT
+ | WORKFLOW_EXECUTION_UUID,
+ | 'outputs',
+ | NULL,
+ | NULL,
+ | '1900-01-01 0.000000'
+ |FROM WORKFLOW_EXECUTION;""".stripMargin
+ )
+ }
+
+ override def getConfirmationMessage: String = "Workflow Execution Table migration complete."
+}
diff --git a/database/migration/src/main/scala/cromwell/database/migration/restart/table/JobStoreSimpletonMigration.scala b/database/migration/src/main/scala/cromwell/database/migration/restart/table/JobStoreSimpletonMigration.scala
index 3e548c16d..c48e76943 100644
--- a/database/migration/src/main/scala/cromwell/database/migration/restart/table/JobStoreSimpletonMigration.scala
+++ b/database/migration/src/main/scala/cromwell/database/migration/restart/table/JobStoreSimpletonMigration.scala
@@ -1,24 +1,14 @@
package cromwell.database.migration.restart.table
-import java.io.{ByteArrayInputStream, IOException}
-import java.util.zip.GZIPInputStream
-
import cromwell.core.simpleton.WdlValueSimpleton._
-import liquibase.change.custom.CustomTaskChange
-import liquibase.database.Database
+import cromwell.database.migration.WdlTransformation._
import liquibase.database.jvm.JdbcConnection
-import liquibase.exception.ValidationErrors
-import liquibase.resource.ResourceAccessor
-import org.apache.commons.codec.binary.Base64
-import org.apache.commons.io.IOUtils
-import wdl4s.types.{WdlPrimitiveType, WdlType}
+import wdl4s.types.WdlType
import scala.language.postfixOps
-import scala.util.Try
class JobStoreSimpletonMigration extends AbstractRestartMigration {
-
override val description = "WORKFLOW_EXECUTION + EXECUTION + SYMBOL + JOB_STORE -> JOB_STORE_RESULT_SIMPLETON"
// GOTC (substituting COUNT(*) for the projection): 1 row in set (5.22 sec)
@@ -52,14 +42,6 @@ class JobStoreSimpletonMigration extends AbstractRestartMigration {
VALUES(?, ?, ?, ?)
"""
- private def inflate(value: String) = {
- Try {
- IOUtils.toString(new GZIPInputStream(new ByteArrayInputStream(Base64.decodeBase64(value))))
- } recover {
- case e: IOException => value
- } get
- }
-
protected def doMigration(connection: JdbcConnection): Unit = {
val query = connection.createStatement()
lazy val insert = connection.prepareStatement(InsertJobStoreSimpleton)
@@ -68,11 +50,8 @@ class JobStoreSimpletonMigration extends AbstractRestartMigration {
while (results.next()) {
val wdlType = WdlType.fromWdlString(results.getString(4))
val rawValue = results.getString(3)
- val inflated = inflate(rawValue)
- val wdlValue = wdlType match {
- case p: WdlPrimitiveType => p.coerceRawValue(inflated).get
- case o => o.fromWdlString(inflated)
- }
+ val inflated = inflate(rawValue).get
+ val wdlValue = coerceStringToWdl(inflated, wdlType)
val name = results.getString(2)
val simpletons = wdlValue.simplify(name)
simpletons foreach { s =>
diff --git a/database/sql/src/main/scala/cromwell/database/slick/tables/CallCachingResultMetaInfoComponent.scala b/database/sql/src/main/scala/cromwell/database/slick/tables/CallCachingResultMetaInfoComponent.scala
index 5f39ceeec..8c8faf3c5 100644
--- a/database/sql/src/main/scala/cromwell/database/slick/tables/CallCachingResultMetaInfoComponent.scala
+++ b/database/sql/src/main/scala/cromwell/database/slick/tables/CallCachingResultMetaInfoComponent.scala
@@ -11,7 +11,7 @@ trait CallCachingResultMetaInfoComponent {
class CallCachingResultMetaInfoEntries(tag: Tag) extends Table[CallCachingResultMetaInfoEntry](tag, "CALL_CACHING_RESULT_METAINFO") {
def callCachingResultMetaInfoId = column[Int]("CALL_CACHING_RESULT_METAINFO_ID", O.PrimaryKey, O.AutoInc)
- def workflowUuid = column[String]("WORKFLOW_UUID")
+ def workflowUuid = column[String]("WORKFLOW_EXECUTION_UUID")
def callFqn = column[String]("CALL_FQN")
def returnCode = column[Option[Int]]("RETURN_CODE")
def scatterIndex = column[Int]("JOB_SCATTER_INDEX")
diff --git a/database/sql/src/main/scala/cromwell/database/slick/tables/JobStoreComponent.scala b/database/sql/src/main/scala/cromwell/database/slick/tables/JobStoreComponent.scala
index c42318c28..63e25b2ad 100644
--- a/database/sql/src/main/scala/cromwell/database/slick/tables/JobStoreComponent.scala
+++ b/database/sql/src/main/scala/cromwell/database/slick/tables/JobStoreComponent.scala
@@ -10,7 +10,7 @@ trait JobStoreComponent {
class JobStoreEntries(tag: Tag) extends Table[JobStoreEntry](tag, "JOB_STORE") {
def jobStoreId = column[Int]("JOB_STORE_ID", O.PrimaryKey, O.AutoInc)
- def workflowUuid = column[String]("WORKFLOW_UUID")
+ def workflowUuid = column[String]("WORKFLOW_EXECUTION_UUID")
def callFqn = column[String]("CALL_FQN")
def scatterIndex = column[Int]("JOB_SCATTER_INDEX")
def attempt = column[Int]("JOB_RETRY_ATTEMPT")
diff --git a/database/sql/src/main/scala/cromwell/database/slick/tables/MetadataComponent.scala b/database/sql/src/main/scala/cromwell/database/slick/tables/MetadataComponent.scala
index ebcc2f44e..973dd54ff 100644
--- a/database/sql/src/main/scala/cromwell/database/slick/tables/MetadataComponent.scala
+++ b/database/sql/src/main/scala/cromwell/database/slick/tables/MetadataComponent.scala
@@ -16,9 +16,9 @@ trait MetadataComponent {
def metadataId = column[Long]("METADATA_JOURNAL_ID", O.PrimaryKey, O.AutoInc)
def workflowExecutionUuid = column[String]("WORKFLOW_EXECUTION_UUID")
def key = column[String]("METADATA_KEY")
- def callFqn = column[Option[String]]("METADATA_CALL_FQN")
- def index = column[Option[Int]]("METADATA_CALL_INDEX")
- def attempt = column[Option[Int]]("METADATA_CALL_ATTEMPT")
+ def callFqn = column[Option[String]]("CALL_FQN")
+ def index = column[Option[Int]]("JOB_SCATTER_INDEX")
+ def attempt = column[Option[Int]]("JOB_RETRY_ATTEMPT")
def value = column[Option[String]]("METADATA_VALUE")
def valueType = column[Option[String]]("METADATA_VALUE_TYPE")
def timestamp = column[Timestamp]("METADATA_TIMESTAMP")
diff --git a/database/sql/src/main/scala/cromwell/database/slick/tables/WorkflowStoreComponent.scala b/database/sql/src/main/scala/cromwell/database/slick/tables/WorkflowStoreComponent.scala
index 7c64fb232..779f6cb0f 100644
--- a/database/sql/src/main/scala/cromwell/database/slick/tables/WorkflowStoreComponent.scala
+++ b/database/sql/src/main/scala/cromwell/database/slick/tables/WorkflowStoreComponent.scala
@@ -12,7 +12,7 @@ trait WorkflowStoreComponent {
class WorkflowStoreEntries(tag: Tag) extends Table[WorkflowStoreEntry](tag, "WORKFLOW_STORE") {
def workflowStoreTableId = column[Int]("WORKFLOW_STORE_ID", O.PrimaryKey, O.AutoInc)
- def workflowUuid = column[String]("WORKFLOW_UUID")
+ def workflowUuid = column[String]("WORKFLOW_EXECUTION_UUID")
def workflowDefinition = column[Clob]("WORKFLOW_DEFINITION")
def workflowInputs = column[Clob]("WORKFLOW_INPUTS")
def workflowOptions = column[Clob]("WORKFLOW_OPTIONS")
diff --git a/database/sql/src/main/scala/cromwell/database/sql/SqlDatabase.scala b/database/sql/src/main/scala/cromwell/database/sql/SqlDatabase.scala
index e4aecb622..117459fd4 100644
--- a/database/sql/src/main/scala/cromwell/database/sql/SqlDatabase.scala
+++ b/database/sql/src/main/scala/cromwell/database/sql/SqlDatabase.scala
@@ -2,7 +2,7 @@ package cromwell.database.sql
import java.sql.Connection
import java.util.UUID
-import lenthall.config.ScalaConfig._
+
import com.typesafe.config.{Config, ConfigFactory, ConfigValueFactory}
trait SqlDatabase extends AutoCloseable
@@ -21,12 +21,6 @@ trait SqlDatabase extends AutoCloseable
}
object SqlDatabase {
- lazy val rootConfig = ConfigFactory.load()
- private lazy val rootDatabaseConfig = rootConfig.getConfig("database")
- private lazy val databaseConfigName = rootDatabaseConfig.getStringOption("config")
- lazy val defaultDatabaseConfig = databaseConfigName.map(getDatabaseConfig).getOrElse(rootDatabaseConfig)
-
- def getDatabaseConfig(path: String) = rootDatabaseConfig.getConfig(path)
/**
* Modifies config.getString("url") to return a unique schema, if the original url contains the text
diff --git a/project/Dependencies.scala b/project/Dependencies.scala
index 1c6d5b194..20792bbe9 100644
--- a/project/Dependencies.scala
+++ b/project/Dependencies.scala
@@ -78,12 +78,14 @@ object Dependencies {
val gcsFileSystemDependencies = baseDependencies ++ googleApiClientDependencies ++ googleCloudDependencies
- val databaseSqlDependencies = baseDependencies ++ slickDependencies ++ dbmsDependencies
+ val databaseCoreDependencies = baseDependencies
+
+ val databaseSqlDependencies = slickDependencies ++ dbmsDependencies
val databaseMigrationDependencies = List(
"org.broadinstitute" %% "wdl4s" % wdl4sV, // Used in migration scripts
"com.github.pathikrit" %% "better-files" % betterFilesV % Test
- ) ++ baseDependencies ++ liquibaseDependencies ++ dbmsDependencies
+ ) ++ liquibaseDependencies ++ dbmsDependencies
val coreDependencies = List(
"com.typesafe.scala-logging" %% "scala-logging" % "3.1.0",
diff --git a/project/Settings.scala b/project/Settings.scala
index b3bbf8c09..413ab3a41 100644
--- a/project/Settings.scala
+++ b/project/Settings.scala
@@ -62,6 +62,11 @@ object Settings {
libraryDependencies ++= gcsFileSystemDependencies
) ++ commonSettings
+ val databaseCoreSettings = List(
+ name := "cromwell-database-core",
+ libraryDependencies ++= databaseCoreDependencies
+ ) ++ commonSettings
+
val databaseSqlSettings = List(
name := "cromwell-database-sql",
libraryDependencies ++= databaseSqlDependencies
diff --git a/services/src/main/scala/cromwell/services/ServicesStore.scala b/services/src/main/scala/cromwell/services/ServicesStore.scala
index f3ebfd4c2..2dc388ada 100644
--- a/services/src/main/scala/cromwell/services/ServicesStore.scala
+++ b/services/src/main/scala/cromwell/services/ServicesStore.scala
@@ -1,5 +1,6 @@
package cromwell.services
+import cromwell.database.core.SqlConfiguration
import cromwell.database.migration.liquibase.LiquibaseUtils
import cromwell.database.slick.SlickDatabase
import cromwell.database.sql.SqlDatabase
@@ -30,5 +31,5 @@ object SingletonServicesStore {
import ServicesStore.EnhancedSqlDatabase
- val databaseInterface: SqlDatabase = new SlickDatabase(SqlDatabase.defaultDatabaseConfig).initialized
+ val databaseInterface: SqlDatabase = new SlickDatabase(SqlConfiguration.defaultDatabaseConfig).initialized
}
diff --git a/services/src/test/scala/cromwell/services/ServicesStoreSpec.scala b/services/src/test/scala/cromwell/services/ServicesStoreSpec.scala
index 4430c5a9e..8f9e906fc 100644
--- a/services/src/test/scala/cromwell/services/ServicesStoreSpec.scala
+++ b/services/src/test/scala/cromwell/services/ServicesStoreSpec.scala
@@ -6,6 +6,7 @@ import java.sql.Connection
import better.files._
import com.typesafe.config.ConfigFactory
import cromwell.core.Tags._
+import cromwell.database.core.SqlConfiguration
import cromwell.database.migration.liquibase.LiquibaseUtils
import cromwell.database.slick.SlickDatabase
import cromwell.database.sql.SqlDatabase
@@ -111,7 +112,7 @@ class ServicesStoreSpec extends FlatSpec with Matchers with ScalaFutures with St
def testWith(configPath: String): Unit = {
import ServicesStore.EnhancedSqlDatabase
- lazy val databaseConfig = SqlDatabase.getDatabaseConfig(configPath)
+ lazy val databaseConfig = SqlConfiguration.getDatabaseConfig(configPath)
lazy val dataAccess = new SlickDatabase(databaseConfig).initialized
it should "(if hsqldb) have transaction isolation mvcc" taggedAs DbmsTest in {
diff --git a/services/src/test/scala/cromwell/services/metadata/impl/MetadataDatabaseAccessSpec.scala b/services/src/test/scala/cromwell/services/metadata/impl/MetadataDatabaseAccessSpec.scala
index 7e1e76a4c..dc3346327 100644
--- a/services/src/test/scala/cromwell/services/metadata/impl/MetadataDatabaseAccessSpec.scala
+++ b/services/src/test/scala/cromwell/services/metadata/impl/MetadataDatabaseAccessSpec.scala
@@ -4,6 +4,7 @@ import java.time.OffsetDateTime
import cromwell.core.Tags.DbmsTest
import cromwell.core._
+import cromwell.database.core.SqlConfiguration
import cromwell.database.slick.SlickDatabase
import cromwell.database.sql.SqlDatabase
import cromwell.services.ServicesStore
@@ -40,7 +41,7 @@ class MetadataDatabaseAccessSpec extends FlatSpec with Matchers with ScalaFuture
import ServicesStore.EnhancedSqlDatabase
lazy val dataAccess: MetadataDatabaseAccess = new MetadataDatabaseAccess with ServicesStore {
- override val databaseInterface = new SlickDatabase(SqlDatabase.getDatabaseConfig(configPath)).initialized
+ override val databaseInterface = new SlickDatabase(SqlConfiguration.getDatabaseConfig(configPath)).initialized
}
def publishMetadataEvents(baseKey: MetadataKey, keyValues: Array[(String, String)]): Future[Unit] = {