diff --git a/bot/src/main/java/org/openjdk/skara/bot/BotRunner.java b/bot/src/main/java/org/openjdk/skara/bot/BotRunner.java index ae7a6dd96..eb2df0d32 100644 --- a/bot/src/main/java/org/openjdk/skara/bot/BotRunner.java +++ b/bot/src/main/java/org/openjdk/skara/bot/BotRunner.java @@ -58,7 +58,7 @@ enum TaskPhases { END } - private AtomicInteger workIdCounter = new AtomicInteger(); + private final AtomicInteger workIdCounter = new AtomicInteger(); private class RunnableWorkItem implements Runnable { private static final Counter.WithThreeLabels EXCEPTIONS_COUNTER = @@ -71,6 +71,11 @@ private class RunnableWorkItem implements Runnable { Gauge.name("skara_runner_allocated_bytes").labels("bot", "work_item").register(); private final WorkItem item; + private final int workId = workIdCounter.incrementAndGet(); + private final Instant createTime = Instant.now(); + // This gets updated by the watchdog when a timeout occurs to avoid + // repeating the timeout log messages too often. + private Instant timeoutWarningTime = createTime; RunnableWorkItem(WorkItem wrappedItem) { item = wrappedItem; @@ -179,7 +184,7 @@ private void runMeasured() { Collection followUpItems = null; try (var __ = new LogContext(Map.of("work_item", item.toString(), - "work_id", String.valueOf(workIdCounter.incrementAndGet())))) { + "work_id", String.valueOf(workId)))) { log.log(Level.FINE, "Executing item " + item + " on repository " + scratchPath, TaskPhases.BEGIN); try { followUpItems = item.run(scratchPath); @@ -225,8 +230,9 @@ private void runMeasured() { if (maySubmit) { pending.remove(candidate); - executor.submit(new RunnableWorkItem(candidate)); - active.put(candidate, Instant.now()); + RunnableWorkItem runnableWorkItem = new RunnableWorkItem(candidate); + executor.submit(runnableWorkItem); + active.put(candidate, runnableWorkItem); log.finer("Submitting candidate: " + candidate); } } @@ -234,8 +240,10 @@ private void runMeasured() { } } + // Mapping of pending items to the active item preventing them from running private final Map> pending; - private final Map active; + // Mapping of active WorkItem to their RunnableWorkItem + private final Map active; private final Deque scratchPaths; private static final Counter.WithTwoLabels SCHEDULED_COUNTER = @@ -249,13 +257,13 @@ private void submitOrSchedule(WorkItem item) { for (var activeItem : active.keySet()) { if (!activeItem.concurrentWith(item)) { - for (var pendingItem : pending.entrySet()) { + for (var pendingItem : pending.keySet()) { // If there are pending items of the same type that we cannot run concurrently with, replace them. - if (item.replaces(pendingItem.getKey())) { - log.finer("Discarding obsoleted item " + pendingItem.getKey() + + if (item.replaces(pendingItem)) { + log.finer("Discarding obsoleted item " + pendingItem + " in favor of item " + item); DISCARDED_COUNTER.labels(item.botName(), item.workItemName()).inc(); - pending.remove(pendingItem.getKey()); + pending.remove(pendingItem); // There can't be more than one break; } @@ -266,8 +274,9 @@ private void submitOrSchedule(WorkItem item) { } } - executor.submit(new RunnableWorkItem(item)); - active.put(item, Instant.now()); + RunnableWorkItem runnableWorkItem = new RunnableWorkItem(item); + executor.submit(runnableWorkItem); + active.put(item, runnableWorkItem); } } @@ -370,13 +379,14 @@ private void checkPeriodicItems() { private void itemWatchdog() { synchronized (executor) { - for (var activeItem : active.entrySet()) { - var activeDuration = Duration.between(activeItem.getValue(), Instant.now()); - if (activeDuration.compareTo(watchdogWarnTimeout) > 0) { - log.severe("Item " + activeItem.getKey() + " has been active more than " + activeDuration + - " - this may be an error!"); + for (var activeRunnableItem : active.values()) { + Instant now = Instant.now(); + var timeoutDuration = Duration.between(activeRunnableItem.timeoutWarningTime, now); + if (timeoutDuration.compareTo(watchdogWarnTimeout) > 0) { + log.severe("Item " + activeRunnableItem.item + " with workId " + activeRunnableItem.workId + " has been active more than " + + Duration.between(activeRunnableItem.createTime, now) + " - this may be an error!"); // Reset the counter to avoid continuous reporting - once every watchdogTimeout is enough - activeItem.setValue(Instant.now()); + activeRunnableItem.timeoutWarningTime = now; } } // Inform the global watchdog that the scheduler is still executing items