-
Notifications
You must be signed in to change notification settings - Fork 24.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Deserialize publish requests on generic thread-pool #108814
Deserialize publish requests on generic thread-pool #108814
Conversation
Hi @nicktindall, I've created a changelog YAML for you. |
server/src/main/java/org/elasticsearch/cluster/coordination/PublicationTransportHandler.java
Outdated
Show resolved
Hide resolved
acceptState( | ||
incomingState, | ||
transportChannel, | ||
(acceptedState) -> lastSeenClusterState.compareAndSet(lastSeen, acceptedState) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
onSuccess
will end up being called by the CLUSTER_COORDINATION
thread only after a successful apply, so I wonder if we need to use compare-and-set here, I don't think it's possible for lastSeenCluster
to have changed between this being dispatched AND the new state being successfully applied. I would think either
- the state has not changed and
onSuccess
is called correctly - the state has changed and the apply fails due to the version check, so
onSuccess
is not called
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason I raise this is because we could do away with the onSuccess
callback if we were able to safely call lastSeenClusterState.set
for both the full and diff payloads.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it can change in between, dispatch and execution, yes - there could be another update in flight when we read the value when then completes and updates it before we get to run.
I am however not sure whether this matters. I'll think about this a little more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This one still concerns me a bit... the task of applying an update is done in three steps
- deserialise payload (on
GENERIC
) - apply payload if valid (on
CLUSTER_COORDINATION
) - update
lastSeen
, send response (onCLUSTER_COORDINATION
)
only (2) happens in the mutex. So you could have the following interleaving (for cluster states a
- version 7 and b
- version 8)
a
-1b
-1a
-2 (succeeds, bump version to 7)b
-2 (succeeds, bump version to 8)b
-3 (setlastSeen
to 8)a
-3 (setlastSeen
to 7) - this means the next diff would be applied to 7, and a 7/9 hybrid would be applied locally?
The above could only happen if there were multiple threads in the CLUSTER_COORDINATION
pool, but you said that is configurable.
Unless I've missed something.
Possible solutions
- Use compare-and-set also for the non-diff case
- this feels wrong, I suspect there are times it is correct to update
lastSeen
even though it changed since we took a reference to it
- this feels wrong, I suspect there are times it is correct to update
- Put an additional check to only bump
lastSeen
when term and version are >= existing term and version?- I don't like this because it'd be duplicating business logic
- Move (3) into the mutex?
- This feels least bad to me
Do we run tests with CLUSTER_COORDINATION
size > 1 ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The sequence of 6 steps is indeed something that could have happened prior to #83576 and can again happen after this change - we could end up having applied state version 8 but with lastSeen
at state version 7. But that's ok, each diff includes a UUID which identifies the base version which we check here:
elasticsearch/server/src/main/java/org/elasticsearch/cluster/ClusterState.java
Lines 1170 to 1172 in 9ba5651
if (fromUuid.equals(state.stateUUID) == false) { | |
throw new IncompatibleClusterStateVersionException(state.version, state.stateUUID, toVersion, fromUuid); | |
} |
That IncompatibleClusterStateVersionException
is caught and handled here:
Lines 473 to 483 in 638a450
if (transportException.unwrapCause() instanceof IncompatibleClusterStateVersionException) { | |
logger.debug( | |
() -> format( | |
"resending full cluster state to node %s reason %s", | |
destination, | |
transportException.getDetailedMessage() | |
) | |
); | |
sendFullClusterState(destination, delegate); | |
return; | |
} |
IOW if we receive a diff between versions 8 & 9 but lastSeen
is at version 7 then we reject the diff and the master sends us the full state at version 9 for us to apply. Somewhat inefficient for sure but still correct.
This case will be exercised in org.elasticsearch.cluster.coordination.CoordinatorTests
in a cluster.runRandomly()
call, but with rather low probability I think. We could try adding a test there which specifically checks this case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great! not a problem we need to solve here then :)
#CLUSTER_STATE_UPDATE_NUMBER_OF_DELAYS
Pinging @elastic/es-distributed (Team:Distributed) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some initial comments, mostly stylistic tho.
server/src/main/java/org/elasticsearch/cluster/coordination/PublicationTransportHandler.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/elasticsearch/cluster/coordination/PublicationTransportHandler.java
Outdated
Show resolved
Hide resolved
acceptState( | ||
incomingState, | ||
transportChannel, | ||
(acceptedState) -> lastSeenClusterState.compareAndSet(lastSeen, acceptedState) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it can change in between, dispatch and execution, yes - there could be another update in flight when we read the value when then completes and updates it before we get to run.
I am however not sure whether this matters. I'll think about this a little more.
We can make incomingState final if we factor out the deserialisation and application of the diff
server/src/main/java/org/elasticsearch/cluster/coordination/PublicationTransportHandler.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
one nit otherwise LGTM
server/src/main/java/org/elasticsearch/cluster/coordination/PublicationTransportHandler.java
Show resolved
Hide resolved
// 6. nodes deserialize committed cluster state | ||
// 7. nodes apply committed cluster state | ||
// 8. master receives ApplyCommitResponses | ||
// 9. apply committed state on master (last one to apply cluster state) | ||
// 10. complete the publication listener back on the master service thread | ||
public static final int CLUSTER_STATE_UPDATE_NUMBER_OF_DELAYS = 10; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 except that this extra step is happening between steps 3 & 4 in the old list. org.elasticsearch.cluster.coordination.ApplyCommitRequest
only carries the term and version, it's org.elasticsearch.cluster.coordination.PublishRequest
which carries the state that's being published.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM (one more tiny suggestion but no need for another review)
server/src/main/java/org/elasticsearch/cluster/coordination/PublicationTransportHandler.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/elasticsearch/cluster/coordination/PublicationTransportHandler.java
Outdated
Show resolved
Hide resolved
…blicationTransportHandler.java Co-authored-by: David Turner <david.turner@elastic.co>
…blicationTransportHandler.java Co-authored-by: David Turner <david.turner@elastic.co>
This PR moves the
publish_state
handler from theCLUSTER_COORDINATION
thread pool to theGENERIC
one. This means the initial handling of the publish request, including the deserialisation of the cluster state, happens on one of theGENERIC
threads instead of theCLUSTER_COORDINATION
thread. Once we have deserialised the cluster state and done some validation, we delegate to theCLUSTER_COORDINATION
pool to apply the new state.The consequences of this include
Closes #106352