diff --git a/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java b/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java index f25ec8a10d70c..cafa8908ba073 100644 --- a/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java +++ b/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java @@ -19,9 +19,13 @@ package org.elasticsearch.action.bulk; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; +import org.apache.lucene.util.CollectionUtil; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.ExceptionsHelper; @@ -40,11 +44,17 @@ import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.GroupShardsIterator; import org.elasticsearch.cluster.routing.ShardIterator; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.InetSocketTransportAddress; import org.elasticsearch.common.util.concurrent.AtomicArray; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.IndexAlreadyExistsException; import org.elasticsearch.rest.RestStatus; @@ -53,10 +63,15 @@ import org.elasticsearch.transport.TransportChannel; import org.elasticsearch.transport.TransportService; -import java.util.List; -import java.util.Locale; +import java.net.InetSocketAddress; +import java.util.Collection; import java.util.Map; import java.util.Set; +import java.util.List; +import java.util.Locale; +import java.util.ArrayList; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; /** @@ -73,6 +88,35 @@ private final TransportShardBulkAction shardBulkAction; private final TransportCreateIndexAction createIndexAction; + + private final LoadingCache reverseIPLookupCache = + CacheBuilder.newBuilder().expireAfterAccess(60, TimeUnit.MINUTES). + build(new CacheLoader() { + @Override + public String load(InetSocketAddress key) throws Exception { + return key.getHostName(); + } + }); + + class BulkStats implements Comparable{ + public ShardId shardId; + public Long latencyMillis = 0L; + public int numItems = 0; + public int failedItems = 0; + public long bytes = 0; + public ShardRouting shardRouting; + + BulkStats(long latencyMillis, int numItems, ShardId shardId) { + this.latencyMillis = latencyMillis; + this.numItems = numItems; + this.shardId = shardId; + } + + @Override + public int compareTo(BulkStats o) { + return o.latencyMillis.compareTo(latencyMillis); + } + } @Inject public TransportBulkAction(Settings settings, ThreadPool threadPool, TransportService transportService, ClusterService clusterService, @@ -234,12 +278,23 @@ private void executeBulk(final BulkRequest bulkRequest, final long startTime, fi // first, go over all the requests and create a ShardId -> Operations mapping Map> requestsByShard = Maps.newHashMap(); - + final Map shardRoutingInfo = Maps.newHashMap(); + final Map perShardReqBytes = Maps.newHashMap(); for (int i = 0; i < bulkRequest.requests.size(); i++) { ActionRequest request = bulkRequest.requests.get(i); if (request instanceof IndexRequest) { IndexRequest indexRequest = (IndexRequest) request; - ShardId shardId = clusterService.operationRouting().indexShards(clusterState, indexRequest.index(), indexRequest.type(), indexRequest.id(), indexRequest.routing()).shardId(); + ShardIterator shardIt = clusterService.operationRouting().indexShards(clusterState, indexRequest.index(), indexRequest.type(), indexRequest.id(), indexRequest.routing()); + ShardId shardId = shardIt.shardId(); + shardRoutingInfo.put(shardId, shardIt.nextOrNull()); + BytesReference buffer = ((IndexRequest) request).source(); + Long shardBytes = perShardReqBytes.get(shardId); + if (shardBytes == null) { + perShardReqBytes.put(shardId, new Long(buffer.length())); + } else { + perShardReqBytes.put(shardId, shardBytes + new Long(buffer.length())); + } + buffer = null; List list = requestsByShard.get(shardId); if (list == null) { list = Lists.newArrayList(); @@ -291,6 +346,7 @@ private void executeBulk(final BulkRequest bulkRequest, final long startTime, fi } final AtomicInteger counter = new AtomicInteger(requestsByShard.size()); + final Map shardStats = new ConcurrentHashMap(); for (Map.Entry> entry : requestsByShard.entrySet()) { final ShardId shardId = entry.getKey(); final List requests = entry.getValue(); @@ -298,11 +354,19 @@ private void executeBulk(final BulkRequest bulkRequest, final long startTime, fi bulkShardRequest.replicationType(bulkRequest.replicationType()); bulkShardRequest.consistencyLevel(bulkRequest.consistencyLevel()); bulkShardRequest.timeout(bulkRequest.timeout()); + shardStats.put(shardId, new BulkStats(System.currentTimeMillis(), requests.size(), shardId)); shardBulkAction.execute(bulkShardRequest, new ActionListener() { @Override public void onResponse(BulkShardResponse bulkShardResponse) { + final BulkStats bk = shardStats.get(shardId); + bk.latencyMillis = System.currentTimeMillis() - bk.latencyMillis; + bk.shardRouting = shardRoutingInfo.get(shardId); + bk.bytes = perShardReqBytes.get(shardId); for (BulkItemResponse bulkItemResponse : bulkShardResponse.getResponses()) { responses.set(bulkItemResponse.getItemId(), bulkItemResponse); + if (bulkItemResponse.isFailed()) { + ++bk.failedItems; + } } if (counter.decrementAndGet() == 0) { finishHim(); @@ -335,7 +399,43 @@ public void onFailure(Throwable e) { } private void finishHim() { - listener.onResponse(new BulkResponse(responses.toArray(new BulkItemResponse[responses.length()]), System.currentTimeMillis() - startTime)); + long bulkTimeMillis = System.currentTimeMillis() - startTime; + final long bulkId = System.nanoTime(); + try { + DiscoveryNodes dn = clusterService.state().getNodes(); + Collection statsCollection = shardStats.values(); + ArrayList statsList = new ArrayList(statsCollection); + CollectionUtil.introSort(statsList); + int slowRank = 0; + // since ES does not handle arrays well, do separate output of each shard + for (BulkStats stats : statsList) { + XContentBuilder builder = XContentFactory.jsonBuilder().startObject(); + builder.field("bulkId", bulkId); + builder.field("totalShards", statsList.size()); + builder.field("totalBulkReqMillis", bulkTimeMillis); + builder.field("bulkItems", bulkRequest.requests.size()); + builder.field("shardId", stats.shardId.getId()); + builder.field("index", stats.shardId.getIndex()); + builder.field("shardMillis", stats.latencyMillis); + builder.field("slowRank", ++slowRank); + builder.field("events", stats.numItems); + builder.field("bytes", stats.bytes); + builder.field("failedItems", stats.failedItems); + if (stats.shardRouting != null) { + builder.field("nodeId", stats.shardRouting.currentNodeId()); + final InetSocketTransportAddress inetAddress = + (InetSocketTransportAddress) dn.get(stats.shardRouting.currentNodeId()) + .address(); + builder.field("hostname", reverseIPLookupCache.get(inetAddress.address())); + } + builder.endObject(); + logger.info(builder.string()); + } + } catch (Exception e) { + logger.error("Error generating json shard stats", e); + } + //logger.info("Slowest shard for request id " + bulkId + ",index=" + slowestIndex + ",shardId=" + slowestShardId); + listener.onResponse(new BulkResponse(responses.toArray(new BulkItemResponse[responses.length()]), bulkTimeMillis)); } }); }