From fc31f14abf7b46680ec94014dd877e2e1f01958a Mon Sep 17 00:00:00 2001 From: Kevin Conaway Date: Thu, 9 Jun 2016 12:32:35 -0400 Subject: [PATCH] FLUME-2920 Don't commit offsets when shutting down consumer. Remove some dead test code --- .../flume/channel/kafka/KafkaChannel.java | 4 - .../flume/channel/kafka/TestKafkaChannel.java | 82 ++++++++++++++++--- 2 files changed, 70 insertions(+), 16 deletions(-) diff --git a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java index 2d9b0c6c18..9a829ca653 100644 --- a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java +++ b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java @@ -267,10 +267,6 @@ private synchronized ConsumerAndRecords createConsumerAndRecords() { } private void decommissionConsumerAndRecords(ConsumerAndRecords c) { - if (c.failedEvents.isEmpty()) { - c.commitOffsets(); - } - c.failedEvents.clear(); c.consumer.close(); } diff --git a/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java b/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java index 637428d120..05381c73ec 100644 --- a/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java +++ b/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java @@ -32,26 +32,42 @@ import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; -import org.junit.*; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.*; -import java.util.concurrent.*; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.*; +import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.BOOTSTRAP_SERVERS_CONFIG; +import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.BROKER_LIST_FLUME_KEY; +import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.GROUP_ID_FLUME; +import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.KEY_HEADER; +import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.PARSE_AS_FLUME_EVENT; +import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.READ_SMALLEST_OFFSET; +import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.TOPIC_CONFIG; public class TestKafkaChannel { - private final static Logger LOGGER = - LoggerFactory.getLogger(TestKafkaChannel.class); - private static TestUtil testUtil = TestUtil.getInstance(); private String topic = null; private final Set usedTopics = new HashSet(); - private CountDownLatch latch = null; @BeforeClass public static void setupClass() throws Exception { @@ -74,7 +90,6 @@ public void setup() throws Exception { } catch (Exception e) { } Thread.sleep(2500); - latch = new CountDownLatch(5); } @AfterClass @@ -186,6 +201,49 @@ public void testParseAsFlumeEventFalseAsSourceCheckHeader() throws Exception { doParseAsFlumeEventFalseAsSource(true); } + @Test + public void testOffsetsNotCommittedOnStop() throws Exception { + String message = "testOffsetsNotCommittedOnStop-" + System.nanoTime(); + + KafkaChannel channel = startChannel(false); + + KafkaProducer producer = new KafkaProducer(channel.getProducerProps()); + ProducerRecord data = new ProducerRecord(topic, "header-" + message, message.getBytes()); + producer.send(data).get(); + producer.flush(); + producer.close(); + + Event event = takeEventWithoutCommittingTxn(channel); + Assert.assertNotNull(event); + Assert.assertTrue(Arrays.equals(message.getBytes(), event.getBody())); + + // Stop the channel without committing the transaction + channel.stop(); + + channel = startChannel(false); + + // Message should still be available + event = takeEventWithoutCommittingTxn(channel); + Assert.assertNotNull(event); + Assert.assertTrue(Arrays.equals(message.getBytes(), event.getBody())); + } + + private Event takeEventWithoutCommittingTxn(KafkaChannel channel) { + for (int i=0; i < 5; i++) { + Transaction txn = channel.getTransaction(); + txn.begin(); + + Event event = channel.take(); + if (event != null) { + return event; + } else { + txn.commit(); + txn.close(); + } + } + return null; + } + private void doParseAsFlumeEventFalse(Boolean checkHeaders) throws Exception { final KafkaChannel channel = startChannel(false); Properties props = channel.getProducerProps();