Toggle navigation
Home
org.apache.pulsar.broker.lookup.LookupResult to java.util.Optional<org.apache.pulsar.broker.lookup.LookupResult>
No. of Instances - 5
No. of Commits - 1
No. of Projects - {'pulsar'}
Hierarchy/Composition: -
Primitive Info: -
NameSpace: Internal -> Jdk
Mapping:
Add or Remove Method invocation
ownershipCache.getOwnerAsync(bundle).thenAccept(nsData -> { if (!nsData.isPresent()) { if (readOnly) { future.completeExceptionally(new IllegalStateException(String.format("Can't find owner of ServiceUnit: %s",bundle))); } else { pulsar.getExecutor().execute(() -> { searchForCandidateBroker(bundle,future,authoritative); } ); } } else if (nsData.get().isDisabled()) { future.completeExceptionally(new IllegalStateException(String.format("Namespace bundle %s is being unloaded",bundle))); } else { if (LOG.isDebugEnabled()) { LOG.debug("Namespace bundle {} already owned by {} ",bundle,nsData); } future.complete(new LookupResult(nsData.get())); } } )
to
ownershipCache.getOwnerAsync(bundle).thenAccept(nsData -> { if (!nsData.isPresent()) { if (readOnly) { future.complete(Optional.empty()); } else { pulsar.getExecutor().execute(() -> { searchForCandidateBroker(bundle,future,authoritative); } ); } } else if (nsData.get().isDisabled()) { future.completeExceptionally(new IllegalStateException(String.format("Namespace bundle %s is being unloaded",bundle))); } else { if (LOG.isDebugEnabled()) { LOG.debug("Namespace bundle {} already owned by {} ",bundle,nsData); } future.complete(Optional.of(new LookupResult(nsData.get()))); } } )
lookupFuture.thenAccept(result -> { if (result == null) { log.warn("No broker was found available for topic {}",topic); completeLookupResponseExceptionally(asyncResponse,new WebApplicationException(Response.Status.SERVICE_UNAVAILABLE)); return; } if (result.isRedirect()) { boolean newAuthoritative=this.isLeaderBroker(); URI redirect; try { String redirectUrl=isRequestHttps() ? result.getLookupData().getHttpUrlTls() : result.getLookupData().getHttpUrl(); redirect=new URI(String.format("%s%s%s?authoritative=%s",redirectUrl,"/lookup/v2/destination/",topic.getLookupName(),newAuthoritative)); } catch ( URISyntaxException e) { log.error("Error in preparing redirect url for {}: {}",topic,e.getMessage(),e); completeLookupResponseExceptionally(asyncResponse,e); return; } if (log.isDebugEnabled()) { log.debug("Redirect lookup for topic {} to {}",topic,redirect); } completeLookupResponseExceptionally(asyncResponse,new WebApplicationException(Response.temporaryRedirect(redirect).build())); } else { if (log.isDebugEnabled()) { log.debug("Lookup succeeded for topic {} -- broker: {}",topic,result.getLookupData()); } completeLookupResponseSuccessfully(asyncResponse,result.getLookupData()); } } )
to
lookupFuture.thenAccept(optionalResult -> { if (optionalResult == null || !optionalResult.isPresent()) { log.warn("No broker was found available for topic {}",topic); completeLookupResponseExceptionally(asyncResponse,new WebApplicationException(Response.Status.SERVICE_UNAVAILABLE)); return; } LookupResult result=optionalResult.get(); if (result.isRedirect()) { boolean newAuthoritative=this.isLeaderBroker(); URI redirect; try { String redirectUrl=isRequestHttps() ? result.getLookupData().getHttpUrlTls() : result.getLookupData().getHttpUrl(); redirect=new URI(String.format("%s%s%s?authoritative=%s",redirectUrl,"/lookup/v2/destination/",topic.getLookupName(),newAuthoritative)); } catch ( URISyntaxException e) { log.error("Error in preparing redirect url for {}: {}",topic,e.getMessage(),e); completeLookupResponseExceptionally(asyncResponse,e); return; } if (log.isDebugEnabled()) { log.debug("Redirect lookup for topic {} to {}",topic,redirect); } completeLookupResponseExceptionally(asyncResponse,new WebApplicationException(Response.temporaryRedirect(redirect).build())); } else { if (log.isDebugEnabled()) { log.debug("Lookup succeeded for topic {} -- broker: {}",topic,result.getLookupData()); } completeLookupResponseSuccessfully(asyncResponse,result.getLookupData()); } } )
ownershipCache.tryAcquiringOwnership(bundle).thenAccept(ownerInfo -> { if (ownerInfo.isDisabled()) { if (LOG.isDebugEnabled()) { LOG.debug("Namespace bundle {} is currently being unloaded",bundle); } lookupFuture.completeExceptionally(new IllegalStateException(String.format("Namespace bundle %s is currently being unloaded",bundle))); } else { pulsar.loadNamespaceDestinations(bundle); lookupFuture.complete(new LookupResult(ownerInfo)); } } )
to
ownershipCache.tryAcquiringOwnership(bundle).thenAccept(ownerInfo -> { if (ownerInfo.isDisabled()) { if (LOG.isDebugEnabled()) { LOG.debug("Namespace bundle {} is currently being unloaded",bundle); } lookupFuture.completeExceptionally(new IllegalStateException(String.format("Namespace bundle %s is currently being unloaded",bundle))); } else { pulsar.loadNamespaceDestinations(bundle); lookupFuture.complete(Optional.of(new LookupResult(ownerInfo))); } } )
createLookupResult(candidateBroker).thenAccept(lookupResult -> lookupFuture.complete(lookupResult))
to
createLookupResult(candidateBroker).thenAccept(lookupResult -> lookupFuture.complete(Optional.of(lookupResult)))