From b2eb61dca1204afb317bd40346065aa6a0e97647 Mon Sep 17 00:00:00 2001 From: ricky Date: Fri, 25 Sep 2015 14:15:09 -0400 Subject: [PATCH 1/3] NIFI-997: Periodically Renew Kerberos Tickets - Renew ticket every 4 hours to avoid inactive Kerberos tickets. --- .../hadoop/AbstractHadoopProcessor.java | 52 +++++++++++++++---- 1 file changed, 42 insertions(+), 10 deletions(-) diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java index 0102b1f91500..a11aa156430e 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java @@ -50,6 +50,7 @@ import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.util.Tuple; @@ -136,6 +137,9 @@ public ValidationResult validate(String subject, String input, ValidationContext private static final Object RESOURCES_LOCK = new Object(); + private static final long TICKET_RENEWAL_THRESHOLD_SEC = 4 * 3600; // renew Kerberos ticket after 4 hours + private long lastTicketRenewal; + static { List props = new ArrayList<>(); props.add(HADOOP_CONFIGURATION_RESOURCES); @@ -154,12 +158,12 @@ public ValidationResult validate(String subject, String input, ValidationContext } // variables shared by all threads of this processor - // Hadoop Configuration and FileSystem - private final AtomicReference> hdfsResources = new AtomicReference<>(); + // Hadoop Configuration, Filesystem, and UserGroupInformation (optional) + private final AtomicReference>> hdfsResources = new AtomicReference<>(); @Override protected void init(ProcessorInitializationContext context) { - hdfsResources.set(new Tuple(null, null)); + hdfsResources.set(new Tuple>(null, null)); } @Override @@ -173,7 +177,7 @@ protected List getSupportedPropertyDescriptors() { @OnScheduled public final void abstractOnScheduled(ProcessContext context) throws IOException { try { - Tuple resources = hdfsResources.get(); + Tuple> resources = hdfsResources.get(); if (resources.getKey() == null || resources.getValue() == null) { String configResources = context.getProperty(HADOOP_CONFIGURATION_RESOURCES).getValue(); String dir = context.getProperty(DIRECTORY_PROP_NAME).getValue(); @@ -183,14 +187,14 @@ public final void abstractOnScheduled(ProcessContext context) throws IOException } } catch (IOException ex) { getLogger().error("HDFS Configuration error - {}", new Object[] { ex }); - hdfsResources.set(new Tuple(null, null)); + hdfsResources.set(new Tuple>(null, null)); throw ex; } } @OnStopped public final void abstractOnStopped() { - hdfsResources.set(new Tuple(null, null)); + hdfsResources.set(new Tuple>(null, null)); } private static Configuration getConfigurationFromResources(String configResources) throws IOException { @@ -224,7 +228,7 @@ private static Configuration getConfigurationFromResources(String configResource /* * Reset Hadoop Configuration and FileSystem based on the supplied configuration resources. */ - Tuple resetHDFSResources(String configResources, String dir, ProcessContext context) throws IOException { + Tuple> resetHDFSResources(String configResources, String dir, ProcessContext context) throws IOException { // org.apache.hadoop.conf.Configuration saves its current thread context class loader to use for threads that it creates // later to do I/O. We need this class loader to be the NarClassLoader instead of the magical // NarThreadContextClassLoader. @@ -244,13 +248,15 @@ Tuple resetHDFSResources(String configResources, Stri // If kerberos is enabled, create the file system as the kerberos principal // -- use RESOURCE_LOCK to guarantee UserGroupInformation is accessed by only a single thread at at time FileSystem fs = null; + UserGroupInformation ugi = null; synchronized (RESOURCES_LOCK) { if (config.get("hadoop.security.authentication").equalsIgnoreCase("kerberos")) { String principal = context.getProperty(KERBEROS_PRINCIPAL).getValue(); String keyTab = context.getProperty(KERBEROS_KEYTAB).getValue(); UserGroupInformation.setConfiguration(config); - UserGroupInformation ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keyTab); + ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keyTab); fs = getFileSystemAsUser(config, ugi); + lastTicketRenewal = System.currentTimeMillis() / 1000; } else { config.set("ipc.client.fallback-to-simple-auth-allowed", "true"); config.set("hadoop.security.authentication", "simple"); @@ -260,7 +266,7 @@ Tuple resetHDFSResources(String configResources, Stri config.set(disableCacheName, "true"); getLogger().info("Initialized a new HDFS File System with working dir: {} default block size: {} default replication: {} config: {}", new Object[] { fs.getWorkingDirectory(), fs.getDefaultBlockSize(new Path(dir)), fs.getDefaultReplication(new Path(dir)), config.toString() }); - return new Tuple<>(config, fs); + return new Tuple<>(config, new Tuple<>(fs, ugi)); } finally { Thread.currentThread().setContextClassLoader(savedClassLoader); @@ -396,6 +402,32 @@ protected Configuration getConfiguration() { } protected FileSystem getFileSystem() { - return hdfsResources.get().getValue(); + // if kerberos is enabled, check if the ticket should be renewed before returning the FS + if (getUserGroupInformation() != null && isTicketOld()) { + renewKerberosTicket(hdfsResources.get().getValue().getValue()); + } + return hdfsResources.get().getValue().getKey(); + } + + protected UserGroupInformation getUserGroupInformation() { + return hdfsResources.get().getValue().getValue(); + } + + protected void renewKerberosTicket(UserGroupInformation ugi) { + try { + getLogger().info(String.format("Kerberos ticket age exceeds threshold [%d seconds], " + + "attempting to renew ticket for user [%s]", + TICKET_RENEWAL_THRESHOLD_SEC, ugi.getUserName())); + ugi.checkTGTAndReloginFromKeytab(); + lastTicketRenewal = System.currentTimeMillis() / 1000; + getLogger().info("Kerberos ticket successfully renewed!"); + } catch (IOException e) { + getLogger().error("Failed to renew Kerberos ticket\n" + e.getMessage()); + throw new ProcessException("Unable to renew kerberos ticket\n" + e.getMessage()); + } + } + + protected boolean isTicketOld() { + return (System.currentTimeMillis() / 1000 - lastTicketRenewal) > TICKET_RENEWAL_THRESHOLD_SEC; } } From 38ee7efeb65301d4166e0fc19f959f29ca6f734b Mon Sep 17 00:00:00 2001 From: ricky Date: Fri, 23 Oct 2015 10:37:24 -0400 Subject: [PATCH 2/3] Remove Tuple Based Resources, Make Kerberos Renewal Period Configurable --- .../hadoop/AbstractHadoopProcessor.java | 64 +++++++++++++++---- 1 file changed, 50 insertions(+), 14 deletions(-) diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java index a11aa156430e..6fd0ac2d2c50 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java @@ -25,6 +25,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import javax.net.SocketFactory; @@ -53,7 +54,6 @@ import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.util.NiFiProperties; -import org.apache.nifi.util.Tuple; /** * This is a base class that is helpful when building processors interacting with HDFS. @@ -133,11 +133,16 @@ public ValidationResult validate(String subject, String input, ValidationContext .description("Kerberos keytab associated with the principal. Requires nifi.kerberos.krb5.file to be set " + "in your nifi.properties").addValidator(Validator.VALID) .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR).addValidator(KERBEROS_CONFIG_VALIDATOR).build(); + private static final PropertyDescriptor KERBEROS_RENEWAL_PERIOD = new PropertyDescriptor.Builder().name("Kerberos Renewal Period").required(false) + .description("Period of time which should pass before renewing the kerberos ticket").defaultValue("4 hours") + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR).addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + protected static final List properties; private static final Object RESOURCES_LOCK = new Object(); - private static final long TICKET_RENEWAL_THRESHOLD_SEC = 4 * 3600; // renew Kerberos ticket after 4 hours + private long TICKET_RENEWAL_THRESHOLD_SEC; private long lastTicketRenewal; static { @@ -145,6 +150,7 @@ public ValidationResult validate(String subject, String input, ValidationContext props.add(HADOOP_CONFIGURATION_RESOURCES); props.add(KERBEROS_PRINCIPAL); props.add(KERBEROS_KEYTAB); + props.add(KERBEROS_RENEWAL_PERIOD); properties = Collections.unmodifiableList(props); try { NIFI_PROPERTIES = NiFiProperties.getInstance(); @@ -159,11 +165,11 @@ public ValidationResult validate(String subject, String input, ValidationContext // variables shared by all threads of this processor // Hadoop Configuration, Filesystem, and UserGroupInformation (optional) - private final AtomicReference>> hdfsResources = new AtomicReference<>(); + private final AtomicReference hdfsResources = new AtomicReference<>(); @Override protected void init(ProcessorInitializationContext context) { - hdfsResources.set(new Tuple>(null, null)); + hdfsResources.set(null); } @Override @@ -177,8 +183,13 @@ protected List getSupportedPropertyDescriptors() { @OnScheduled public final void abstractOnScheduled(ProcessContext context) throws IOException { try { - Tuple> resources = hdfsResources.get(); - if (resources.getKey() == null || resources.getValue() == null) { + // This value will be null when called from ListHDFS, because it overrides all of the default + // properties this processor sets. TODO: re-work ListHDFS to utilize Kerberos + if (context.getProperty(KERBEROS_RENEWAL_PERIOD).getValue() != null) { + TICKET_RENEWAL_THRESHOLD_SEC = context.getProperty(KERBEROS_RENEWAL_PERIOD).asTimePeriod(TimeUnit.SECONDS); + } + HdfsResources resources = hdfsResources.get(); + if (resources == null) { String configResources = context.getProperty(HADOOP_CONFIGURATION_RESOURCES).getValue(); String dir = context.getProperty(DIRECTORY_PROP_NAME).getValue(); dir = dir == null ? "/" : dir; @@ -187,14 +198,14 @@ public final void abstractOnScheduled(ProcessContext context) throws IOException } } catch (IOException ex) { getLogger().error("HDFS Configuration error - {}", new Object[] { ex }); - hdfsResources.set(new Tuple>(null, null)); + hdfsResources.set(null); throw ex; } } @OnStopped public final void abstractOnStopped() { - hdfsResources.set(new Tuple>(null, null)); + hdfsResources.set(null); } private static Configuration getConfigurationFromResources(String configResources) throws IOException { @@ -228,7 +239,7 @@ private static Configuration getConfigurationFromResources(String configResource /* * Reset Hadoop Configuration and FileSystem based on the supplied configuration resources. */ - Tuple> resetHDFSResources(String configResources, String dir, ProcessContext context) throws IOException { + HdfsResources resetHDFSResources(String configResources, String dir, ProcessContext context) throws IOException { // org.apache.hadoop.conf.Configuration saves its current thread context class loader to use for threads that it creates // later to do I/O. We need this class loader to be the NarClassLoader instead of the magical // NarThreadContextClassLoader. @@ -266,7 +277,7 @@ Tuple> resetHDFSResources config.set(disableCacheName, "true"); getLogger().info("Initialized a new HDFS File System with working dir: {} default block size: {} default replication: {} config: {}", new Object[] { fs.getWorkingDirectory(), fs.getDefaultBlockSize(new Path(dir)), fs.getDefaultReplication(new Path(dir)), config.toString() }); - return new Tuple<>(config, new Tuple<>(fs, ugi)); + return new HdfsResources(config, fs, ugi); } finally { Thread.currentThread().setContextClassLoader(savedClassLoader); @@ -398,19 +409,19 @@ public static String getPathDifference(final Path root, final Path child) { } protected Configuration getConfiguration() { - return hdfsResources.get().getKey(); + return hdfsResources.get().getConfiguration(); } protected FileSystem getFileSystem() { // if kerberos is enabled, check if the ticket should be renewed before returning the FS if (getUserGroupInformation() != null && isTicketOld()) { - renewKerberosTicket(hdfsResources.get().getValue().getValue()); + renewKerberosTicket(getUserGroupInformation()); } - return hdfsResources.get().getValue().getKey(); + return hdfsResources.get().getFileSystem(); } protected UserGroupInformation getUserGroupInformation() { - return hdfsResources.get().getValue().getValue(); + return hdfsResources.get().getUserGroupInformation(); } protected void renewKerberosTicket(UserGroupInformation ugi) { @@ -430,4 +441,29 @@ protected void renewKerberosTicket(UserGroupInformation ugi) { protected boolean isTicketOld() { return (System.currentTimeMillis() / 1000 - lastTicketRenewal) > TICKET_RENEWAL_THRESHOLD_SEC; } + + + protected class HdfsResources { + private Configuration configuration; + private FileSystem fileSystem; + private UserGroupInformation userGroupInformation; + + public HdfsResources(Configuration configuration, FileSystem fileSystem, UserGroupInformation userGroupInformation) { + this.configuration = configuration; + this.fileSystem = fileSystem; + this.userGroupInformation = userGroupInformation; + } + + public Configuration getConfiguration() { + return configuration; + } + + public FileSystem getFileSystem() { + return fileSystem; + } + + public UserGroupInformation getUserGroupInformation() { + return userGroupInformation; + } + } } From 4713b56cb6ce6aad8e75a18189f1cc67f6208500 Mon Sep 17 00:00:00 2001 From: ricky Date: Fri, 23 Oct 2015 15:56:47 -0400 Subject: [PATCH 3/3] Camel Case Kerberos Threshold / Fix Potential NPE --- .../hadoop/AbstractHadoopProcessor.java | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java index 6fd0ac2d2c50..5021921210e3 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java @@ -142,7 +142,7 @@ public ValidationResult validate(String subject, String input, ValidationContext private static final Object RESOURCES_LOCK = new Object(); - private long TICKET_RENEWAL_THRESHOLD_SEC; + private long ticketRenewalThresholdSeconds; private long lastTicketRenewal; static { @@ -169,7 +169,7 @@ public ValidationResult validate(String subject, String input, ValidationContext @Override protected void init(ProcessorInitializationContext context) { - hdfsResources.set(null); + hdfsResources.set(new HdfsResources(null, null, null)); } @Override @@ -186,10 +186,10 @@ public final void abstractOnScheduled(ProcessContext context) throws IOException // This value will be null when called from ListHDFS, because it overrides all of the default // properties this processor sets. TODO: re-work ListHDFS to utilize Kerberos if (context.getProperty(KERBEROS_RENEWAL_PERIOD).getValue() != null) { - TICKET_RENEWAL_THRESHOLD_SEC = context.getProperty(KERBEROS_RENEWAL_PERIOD).asTimePeriod(TimeUnit.SECONDS); + ticketRenewalThresholdSeconds = context.getProperty(KERBEROS_RENEWAL_PERIOD).asTimePeriod(TimeUnit.SECONDS); } HdfsResources resources = hdfsResources.get(); - if (resources == null) { + if (resources.getConfiguration() == null) { String configResources = context.getProperty(HADOOP_CONFIGURATION_RESOURCES).getValue(); String dir = context.getProperty(DIRECTORY_PROP_NAME).getValue(); dir = dir == null ? "/" : dir; @@ -198,14 +198,14 @@ public final void abstractOnScheduled(ProcessContext context) throws IOException } } catch (IOException ex) { getLogger().error("HDFS Configuration error - {}", new Object[] { ex }); - hdfsResources.set(null); + hdfsResources.set(new HdfsResources(null, null, null)); throw ex; } } @OnStopped public final void abstractOnStopped() { - hdfsResources.set(null); + hdfsResources.set(new HdfsResources(null, null, null)); } private static Configuration getConfigurationFromResources(String configResources) throws IOException { @@ -428,7 +428,7 @@ protected void renewKerberosTicket(UserGroupInformation ugi) { try { getLogger().info(String.format("Kerberos ticket age exceeds threshold [%d seconds], " + "attempting to renew ticket for user [%s]", - TICKET_RENEWAL_THRESHOLD_SEC, ugi.getUserName())); + ticketRenewalThresholdSeconds, ugi.getUserName())); ugi.checkTGTAndReloginFromKeytab(); lastTicketRenewal = System.currentTimeMillis() / 1000; getLogger().info("Kerberos ticket successfully renewed!"); @@ -439,7 +439,7 @@ protected void renewKerberosTicket(UserGroupInformation ugi) { } protected boolean isTicketOld() { - return (System.currentTimeMillis() / 1000 - lastTicketRenewal) > TICKET_RENEWAL_THRESHOLD_SEC; + return (System.currentTimeMillis() / 1000 - lastTicketRenewal) > ticketRenewalThresholdSeconds; }