io.netty.util.concurrent.Future to org.redisson.api.RFuture
No. of Instances - 1123
No. of Commits - 3
No. of Projects - {'redisson'}
Hierarchy/Composition: T_SUPER_R
Primitive Info: -
NameSpace: External -> Internal
Mapping:
- Cascading Type Change (Different)
- Cascading Type Change (Similar)
- Future<Long> to RFuture<Long>
- Future<RedissonLockEntry> to RFuture<RedissonLockEntry>
- Future<Void> to RFuture<Void>
- Future<RedissonLockEntry> to RFuture<RedissonLockEntry>
- Future<RemoteServiceAck> to RFuture<RemoteServiceAck>
- Add or Remove Method invocation
- checkConnectionFuture(entry,source,mainPromise,attemptPromise,details,connFuture) to checkConnectionFuture(entry,source,mainPromise,attemptPromise,details,connectionFuture)
- Update Anonymous class
- { @Override public void operationComplete( Future<RemoteServiceAck> future) throws Exception { if (!future.isSuccess()) { result.tryFailure(future.cause()); return; } RemoteServiceAck ack=future.getNow(); if (ack == null) { Future<RemoteServiceAck> ackFutureAttempt=tryPollAckAgainAsync(optionsCopy,responseQueue,ackName); ackFutureAttempt.addListener(new FutureListener<RemoteServiceAck>(){ @Override public void operationComplete( Future<RemoteServiceAck> future) throws Exception { if (!future.isSuccess()) { result.tryFailure(future.cause()); return; } if (future.getNow() == null) { Exception ex=new RemoteServiceAckTimeoutException("No ACK response after " + optionsCopy.getAckTimeoutInMillis() + "ms for request: "+ request); result.tryFailure(ex); return; } awaitResultAsync(optionsCopy,result,request,responseName,ackName); } } ); } else { awaitResultAsync(optionsCopy,result,request,responseName); } } } to { @Override public void operationComplete( Future<RemoteServiceAck> future) throws Exception { if (!future.isSuccess()) { result.tryFailure(future.cause()); return; } RemoteServiceAck ack=future.getNow(); if (ack == null) { RFuture<RemoteServiceAck> ackFutureAttempt=tryPollAckAgainAsync(optionsCopy,responseQueue,ackName); ackFutureAttempt.addListener(new FutureListener<RemoteServiceAck>(){ @Override public void operationComplete( Future<RemoteServiceAck> future) throws Exception { if (!future.isSuccess()) { result.tryFailure(future.cause()); return; } if (future.getNow() == null) { Exception ex=new RemoteServiceAckTimeoutException("No ACK response after " + optionsCopy.getAckTimeoutInMillis() + "ms for request: "+ request); result.tryFailure(ex); return; } awaitResultAsync(optionsCopy,result,request,responseName,ackName); } } ); } else { awaitResultAsync(optionsCopy,result,request,responseName); } } }
- { @Override public void operationComplete( Future<RemoteServiceRequest> future) throws Exception { if (!future.isSuccess()) { if (future.cause() instanceof RedissonShutdownException) { return; } subscribe(remoteInterface,requestQueue,executor); return; } final RemoteServiceRequest request=future.getNow(); if (request.getOptions().isAckExpected() && System.currentTimeMillis() - request.getDate() > request.getOptions().getAckTimeoutInMillis()) { log.debug("request: {} has been skipped due to ackTimeout"); subscribe(remoteInterface,requestQueue,executor); return; } final String responseName=getResponseQueueName(remoteInterface,request.getRequestId()); if (request.getOptions().isAckExpected()) { String ackName=getAckName(remoteInterface,request.getRequestId()); Future<Boolean> ackClientsFuture=commandExecutor.evalWriteAsync(responseName,LongCodec.INSTANCE,RedisCommands.EVAL_BOOLEAN,"if redis.call('setnx', KEYS[1], 1) == 1 then " + "redis.call('pexpire', KEYS[1], ARGV[2]);" + "redis.call('rpush', KEYS[2], ARGV[1]);"+ "redis.call('pexpire', KEYS[2], ARGV[2]);"+ "return 1;"+ "end;"+ "return 0;",Arrays.<Object>asList(ackName,responseName),encode(new RemoteServiceAck()),request.getOptions().getAckTimeoutInMillis()); ackClientsFuture.addListener(new FutureListener<Boolean>(){ @Override public void operationComplete( Future<Boolean> future) throws Exception { if (!future.isSuccess()) { log.error("Can't send ack for request: " + request,future.cause()); if (future.cause() instanceof RedissonShutdownException) { return; } subscribe(remoteInterface,requestQueue,executor); return; } if (!future.getNow()) { subscribe(remoteInterface,requestQueue,executor); return; } executeMethod(remoteInterface,requestQueue,executor,request); } } ); } else { executeMethod(remoteInterface,requestQueue,executor,request); } } } to { @Override public void operationComplete( Future<RemoteServiceRequest> future) throws Exception { if (!future.isSuccess()) { if (future.cause() instanceof RedissonShutdownException) { return; } subscribe(remoteInterface,requestQueue,executor); return; } final RemoteServiceRequest request=future.getNow(); if (request.getOptions().isAckExpected() && System.currentTimeMillis() - request.getDate() > request.getOptions().getAckTimeoutInMillis()) { log.debug("request: {} has been skipped due to ackTimeout"); subscribe(remoteInterface,requestQueue,executor); return; } final String responseName=getResponseQueueName(remoteInterface,request.getRequestId()); if (request.getOptions().isAckExpected()) { String ackName=getAckName(remoteInterface,request.getRequestId()); RFuture<Boolean> ackClientsFuture=commandExecutor.evalWriteAsync(responseName,LongCodec.INSTANCE,RedisCommands.EVAL_BOOLEAN,"if redis.call('setnx', KEYS[1], 1) == 1 then " + "redis.call('pexpire', KEYS[1], ARGV[2]);" + "redis.call('rpush', KEYS[2], ARGV[1]);"+ "redis.call('pexpire', KEYS[2], ARGV[2]);"+ "return 1;"+ "end;"+ "return 0;",Arrays.<Object>asList(ackName,responseName),encode(new RemoteServiceAck()),request.getOptions().getAckTimeoutInMillis()); ackClientsFuture.addListener(new FutureListener<Boolean>(){ @Override public void operationComplete( Future<Boolean> future) throws Exception { if (!future.isSuccess()) { log.error("Can't send ack for request: " + request,future.cause()); if (future.cause() instanceof RedissonShutdownException) { return; } subscribe(remoteInterface,requestQueue,executor); return; } if (!future.getNow()) { subscribe(remoteInterface,requestQueue,executor); return; } executeMethod(remoteInterface,requestQueue,executor,request); } } ); } else { executeMethod(remoteInterface,requestQueue,executor,request); } } }
- { @Override public void operationComplete( Future<RedisConnection> future) throws Exception { if (entry.getFreezeReason() != FreezeReason.RECONNECT || !entry.isFreezed()) { return; } if (!future.isSuccess()) { scheduleCheck(entry); return; } final RedisConnection c=future.getNow(); if (!c.isActive()) { c.closeAsync(); scheduleCheck(entry); return; } final FutureListener<String> pingListener=new FutureListener<String>(){ @Override public void operationComplete( Future<String> future) throws Exception { try { if (entry.getFreezeReason() != FreezeReason.RECONNECT || !entry.isFreezed()) { return; } if (future.isSuccess() && "PONG".equals(future.getNow())) { entry.resetFailedAttempts(); Promise<Void> promise=connectionManager.newPromise(); promise.addListener(new FutureListener<Void>(){ @Override public void operationComplete( Future<Void> future) throws Exception { if (entry.getNodeType() == NodeType.SLAVE) { masterSlaveEntry.slaveUp(entry.getClient().getAddr().getHostName(),entry.getClient().getAddr().getPort(),FreezeReason.RECONNECT); log.info("slave {} successfully reconnected",entry.getClient().getAddr()); } else { synchronized (entry) { if (entry.getFreezeReason() == FreezeReason.RECONNECT) { entry.setFreezed(false); entry.setFreezeReason(null); log.info("host {} successfully reconnected",entry.getClient().getAddr()); } } } } } ); initConnections(entry,promise,false); } else { scheduleCheck(entry); } } finally { c.closeAsync(); } } } ; if (entry.getConfig().getPassword() != null) { Future<Void> temp=c.async(RedisCommands.AUTH,config.getPassword()); FutureListener<Void> listener=new FutureListener<Void>(){ @Override public void operationComplete( Future<Void> future) throws Exception { ping(c,pingListener); } } ; temp.addListener(listener); } else { ping(c,pingListener); } } } to { @Override public void operationComplete( Future<RedisConnection> future) throws Exception { if (entry.getFreezeReason() != FreezeReason.RECONNECT || !entry.isFreezed()) { return; } if (!future.isSuccess()) { scheduleCheck(entry); return; } final RedisConnection c=future.getNow(); if (!c.isActive()) { c.closeAsync(); scheduleCheck(entry); return; } final FutureListener<String> pingListener=new FutureListener<String>(){ @Override public void operationComplete( Future<String> future) throws Exception { try { if (entry.getFreezeReason() != FreezeReason.RECONNECT || !entry.isFreezed()) { return; } if (future.isSuccess() && "PONG".equals(future.getNow())) { entry.resetFailedAttempts(); RPromise<Void> promise=connectionManager.newPromise(); promise.addListener(new FutureListener<Void>(){ @Override public void operationComplete( Future<Void> future) throws Exception { if (entry.getNodeType() == NodeType.SLAVE) { masterSlaveEntry.slaveUp(entry.getClient().getAddr().getHostName(),entry.getClient().getAddr().getPort(),FreezeReason.RECONNECT); log.info("slave {} successfully reconnected",entry.getClient().getAddr()); } else { synchronized (entry) { if (entry.getFreezeReason() == FreezeReason.RECONNECT) { entry.setFreezed(false); entry.setFreezeReason(null); log.info("host {} successfully reconnected",entry.getClient().getAddr()); } } } } } ); initConnections(entry,promise,false); } else { scheduleCheck(entry); } } finally { c.closeAsync(); } } } ; if (entry.getConfig().getPassword() != null) { RFuture<Void> temp=c.async(RedisCommands.AUTH,config.getPassword()); FutureListener<Void> listener=new FutureListener<Void>(){ @Override public void operationComplete( Future<Void> future) throws Exception { ping(c,pingListener); } } ; temp.addListener(listener); } else { ping(c,pingListener); } } }
- { @Override public void operationComplete( Future<Boolean> future) throws Exception { if (!future.isSuccess()) { result.setFailure(future.cause()); return; } if (future.getNow()) { result.setSuccess(true); return; } final long current=System.currentTimeMillis(); final AtomicReference<Timeout> futureRef=new AtomicReference<Timeout>(); final Future<RedissonLockEntry> subscribeFuture=subscribe(); subscribeFuture.addListener(new FutureListener<RedissonLockEntry>(){ @Override public void operationComplete( Future<RedissonLockEntry> future) throws Exception { if (!future.isSuccess()) { result.setFailure(future.cause()); return; } if (futureRef.get() != null) { futureRef.get().cancel(); } long elapsed=System.currentTimeMillis() - current; time.addAndGet(-elapsed); if (time.get() < 0) { unsubscribe(subscribeFuture); result.trySuccess(false); return; } tryAcquireAsync(time,permits,subscribeFuture,result); } } ); if (!subscribeFuture.isDone()) { Timeout scheduledFuture=commandExecutor.getConnectionManager().newTimeout(new TimerTask(){ @Override public void run( Timeout timeout) throws Exception { if (!subscribeFuture.isDone()) { result.trySuccess(false); } } } ,time.get(),TimeUnit.MILLISECONDS); futureRef.set(scheduledFuture); } } } to { @Override public void operationComplete( Future<Boolean> future) throws Exception { if (!future.isSuccess()) { result.setFailure(future.cause()); return; } if (future.getNow()) { result.setSuccess(true); return; } final long current=System.currentTimeMillis(); final AtomicReference<Timeout> futureRef=new AtomicReference<Timeout>(); final RFuture<RedissonLockEntry> subscribeFuture=subscribe(); subscribeFuture.addListener(new FutureListener<RedissonLockEntry>(){ @Override public void operationComplete( Future<RedissonLockEntry> future) throws Exception { if (!future.isSuccess()) { result.setFailure(future.cause()); return; } if (futureRef.get() != null) { futureRef.get().cancel(); } long elapsed=System.currentTimeMillis() - current; time.addAndGet(-elapsed); if (time.get() < 0) { unsubscribe(subscribeFuture); result.trySuccess(false); return; } tryAcquireAsync(time,permits,subscribeFuture,result); } } ); if (!subscribeFuture.isDone()) { Timeout scheduledFuture=commandExecutor.getConnectionManager().newTimeout(new TimerTask(){ @Override public void run( Timeout timeout) throws Exception { if (!subscribeFuture.isDone()) { result.trySuccess(false); } } } ,time.get(),TimeUnit.MILLISECONDS); futureRef.set(scheduledFuture); } } }
- { @Override public void operationComplete( Future<Map<String,String>> future) throws Exception { if (!future.isSuccess()) { log.error("Can't execute CLUSTER_INFO for " + connection.getRedisClient().getAddr(),future.cause()); result.setFailure(future.cause()); return; } Map<String,String> params=future.getNow(); if ("fail".equals(params.get("cluster_state"))) { RedisException e=new RedisException("Failed to add master: " + partition.getMasterAddress() + " for slot ranges: "+ partition.getSlotRanges()+ ". Reason - cluster_state:fail"); log.error("cluster_state:fail for " + connection.getRedisClient().getAddr()); result.setFailure(e); return; } MasterSlaveServersConfig config=create(cfg); config.setMasterAddress(partition.getMasterAddress()); final MasterSlaveEntry e; List<Future<Void>> futures=new ArrayList<Future<Void>>(); if (config.getReadMode() == ReadMode.MASTER) { e=new SingleEntry(partition.getSlotRanges(),ClusterConnectionManager.this,config); } else { config.setSlaveAddresses(partition.getSlaveAddresses()); e=new MasterSlaveEntry(partition.getSlotRanges(),ClusterConnectionManager.this,config); List<Future<Void>> fs=e.initSlaveBalancer(partition.getFailedSlaveAddresses()); futures.addAll(fs); if (!partition.getSlaveAddresses().isEmpty()) { log.info("slaves: {} added for slot ranges: {}",partition.getSlaveAddresses(),partition.getSlotRanges()); if (!partition.getFailedSlaveAddresses().isEmpty()) { log.warn("slaves: {} is down for slot ranges: {}",partition.getFailedSlaveAddresses(),partition.getSlotRanges()); } } } Future<Void> f=e.setupMasterEntry(config.getMasterAddress().getHost(),config.getMasterAddress().getPort()); final Promise<Void> initFuture=newPromise(); futures.add(initFuture); f.addListener(new FutureListener<Void>(){ @Override public void operationComplete( Future<Void> future) throws Exception { if (!future.isSuccess()) { log.error("Can't add master: {} for slot ranges: {}",partition.getMasterAddress(),partition.getSlotRanges()); initFuture.setFailure(future.cause()); return; } for ( Integer slot : partition.getSlots()) { addEntry(slot,e); lastPartitions.put(slot,partition); } log.info("master: {} added for slot ranges: {}",partition.getMasterAddress(),partition.getSlotRanges()); initFuture.setSuccess(null); } } ); result.setSuccess(futures); } } to { @Override public void operationComplete( Future<Map<String,String>> future) throws Exception { if (!future.isSuccess()) { log.error("Can't execute CLUSTER_INFO for " + connection.getRedisClient().getAddr(),future.cause()); result.setFailure(future.cause()); return; } Map<String,String> params=future.getNow(); if ("fail".equals(params.get("cluster_state"))) { RedisException e=new RedisException("Failed to add master: " + partition.getMasterAddress() + " for slot ranges: "+ partition.getSlotRanges()+ ". Reason - cluster_state:fail"); log.error("cluster_state:fail for " + connection.getRedisClient().getAddr()); result.setFailure(e); return; } MasterSlaveServersConfig config=create(cfg); config.setMasterAddress(partition.getMasterAddress()); final MasterSlaveEntry e; List<RFuture<Void>> futures=new ArrayList<RFuture<Void>>(); if (config.getReadMode() == ReadMode.MASTER) { e=new SingleEntry(partition.getSlotRanges(),ClusterConnectionManager.this,config); } else { config.setSlaveAddresses(partition.getSlaveAddresses()); e=new MasterSlaveEntry(partition.getSlotRanges(),ClusterConnectionManager.this,config); List<RFuture<Void>> fs=e.initSlaveBalancer(partition.getFailedSlaveAddresses()); futures.addAll(fs); if (!partition.getSlaveAddresses().isEmpty()) { log.info("slaves: {} added for slot ranges: {}",partition.getSlaveAddresses(),partition.getSlotRanges()); if (!partition.getFailedSlaveAddresses().isEmpty()) { log.warn("slaves: {} is down for slot ranges: {}",partition.getFailedSlaveAddresses(),partition.getSlotRanges()); } } } RFuture<Void> f=e.setupMasterEntry(config.getMasterAddress().getHost(),config.getMasterAddress().getPort()); final RPromise<Void> initFuture=newPromise(); futures.add(initFuture); f.addListener(new FutureListener<Void>(){ @Override public void operationComplete( Future<Void> future) throws Exception { if (!future.isSuccess()) { log.error("Can't add master: {} for slot ranges: {}",partition.getMasterAddress(),partition.getSlotRanges()); initFuture.setFailure(future.cause()); return; } for ( Integer slot : partition.getSlots()) { addEntry(slot,e); lastPartitions.put(slot,partition); } log.info("master: {} added for slot ranges: {}",partition.getMasterAddress(),partition.getSlotRanges()); initFuture.setSuccess(null); } } ); result.setSuccess(futures); } }
- { @Override public void operationComplete( Future<Long> future) throws Exception { if (!future.isSuccess()) { result.setFailure(future.cause()); return; } Long ttl=future.getNow(); if (ttl == null) { result.setSuccess(true); return; } final long current=System.currentTimeMillis(); final AtomicReference<Timeout> futureRef=new AtomicReference<Timeout>(); final Future<RedissonLockEntry> subscribeFuture=subscribe(currentThreadId); subscribeFuture.addListener(new FutureListener<RedissonLockEntry>(){ @Override public void operationComplete( Future<RedissonLockEntry> future) throws Exception { if (!future.isSuccess()) { result.tryFailure(future.cause()); return; } if (futureRef.get() != null) { futureRef.get().cancel(); } long elapsed=System.currentTimeMillis() - current; time.addAndGet(-elapsed); if (time.get() < 0) { unsubscribe(subscribeFuture,currentThreadId); result.trySuccess(false); return; } tryLockAsync(time,leaseTime,unit,subscribeFuture,result,currentThreadId); } } ); if (!subscribeFuture.isDone()) { Timeout scheduledFuture=commandExecutor.getConnectionManager().newTimeout(new TimerTask(){ @Override public void run( Timeout timeout) throws Exception { if (!subscribeFuture.isDone()) { subscribeFuture.cancel(false); result.trySuccess(false); } } } ,time.get(),TimeUnit.MILLISECONDS); futureRef.set(scheduledFuture); } } } to { @Override public void operationComplete( Future<Long> future) throws Exception { if (!future.isSuccess()) { result.setFailure(future.cause()); return; } Long ttl=future.getNow(); if (ttl == null) { result.setSuccess(true); return; } final long current=System.currentTimeMillis(); final AtomicReference<Timeout> futureRef=new AtomicReference<Timeout>(); final RFuture<RedissonLockEntry> subscribeFuture=subscribe(currentThreadId); subscribeFuture.addListener(new FutureListener<RedissonLockEntry>(){ @Override public void operationComplete( Future<RedissonLockEntry> future) throws Exception { if (!future.isSuccess()) { result.tryFailure(future.cause()); return; } if (futureRef.get() != null) { futureRef.get().cancel(); } long elapsed=System.currentTimeMillis() - current; time.addAndGet(-elapsed); if (time.get() < 0) { unsubscribe(subscribeFuture,currentThreadId); result.trySuccess(false); return; } tryLockAsync(time,leaseTime,unit,subscribeFuture,result,currentThreadId); } } ); if (!subscribeFuture.isDone()) { Timeout scheduledFuture=commandExecutor.getConnectionManager().newTimeout(new TimerTask(){ @Override public void run( Timeout timeout) throws Exception { if (!subscribeFuture.isDone()) { subscribeFuture.cancel(false); result.trySuccess(false); } } } ,time.get(),TimeUnit.MILLISECONDS); futureRef.set(scheduledFuture); } } }
- { @Override public void operationComplete( Future<RedisConnection> future) throws Exception { if (!future.isSuccess()) { log.error("Can't connect to master: {} with slot ranges: {}",partition.getMasterAddress(),partition.getSlotRanges()); result.setFailure(future.cause()); return; } final RedisConnection connection=future.getNow(); Future<Map<String,String>> clusterFuture=connection.async(RedisCommands.CLUSTER_INFO); clusterFuture.addListener(new FutureListener<Map<String,String>>(){ @Override public void operationComplete( Future<Map<String,String>> future) throws Exception { if (!future.isSuccess()) { log.error("Can't execute CLUSTER_INFO for " + connection.getRedisClient().getAddr(),future.cause()); result.setFailure(future.cause()); return; } Map<String,String> params=future.getNow(); if ("fail".equals(params.get("cluster_state"))) { RedisException e=new RedisException("Failed to add master: " + partition.getMasterAddress() + " for slot ranges: "+ partition.getSlotRanges()+ ". Reason - cluster_state:fail"); log.error("cluster_state:fail for " + connection.getRedisClient().getAddr()); result.setFailure(e); return; } MasterSlaveServersConfig config=create(cfg); config.setMasterAddress(partition.getMasterAddress()); final MasterSlaveEntry e; List<Future<Void>> futures=new ArrayList<Future<Void>>(); if (config.getReadMode() == ReadMode.MASTER) { e=new SingleEntry(partition.getSlotRanges(),ClusterConnectionManager.this,config); } else { config.setSlaveAddresses(partition.getSlaveAddresses()); e=new MasterSlaveEntry(partition.getSlotRanges(),ClusterConnectionManager.this,config); List<Future<Void>> fs=e.initSlaveBalancer(partition.getFailedSlaveAddresses()); futures.addAll(fs); if (!partition.getSlaveAddresses().isEmpty()) { log.info("slaves: {} added for slot ranges: {}",partition.getSlaveAddresses(),partition.getSlotRanges()); if (!partition.getFailedSlaveAddresses().isEmpty()) { log.warn("slaves: {} is down for slot ranges: {}",partition.getFailedSlaveAddresses(),partition.getSlotRanges()); } } } Future<Void> f=e.setupMasterEntry(config.getMasterAddress().getHost(),config.getMasterAddress().getPort()); final Promise<Void> initFuture=newPromise(); futures.add(initFuture); f.addListener(new FutureListener<Void>(){ @Override public void operationComplete( Future<Void> future) throws Exception { if (!future.isSuccess()) { log.error("Can't add master: {} for slot ranges: {}",partition.getMasterAddress(),partition.getSlotRanges()); initFuture.setFailure(future.cause()); return; } for ( Integer slot : partition.getSlots()) { addEntry(slot,e); lastPartitions.put(slot,partition); } log.info("master: {} added for slot ranges: {}",partition.getMasterAddress(),partition.getSlotRanges()); initFuture.setSuccess(null); } } ); result.setSuccess(futures); } } ); } } to { @Override public void operationComplete( Future<RedisConnection> future) throws Exception { if (!future.isSuccess()) { log.error("Can't connect to master: {} with slot ranges: {}",partition.getMasterAddress(),partition.getSlotRanges()); result.setFailure(future.cause()); return; } final RedisConnection connection=future.getNow(); RFuture<Map<String,String>> clusterFuture=connection.async(RedisCommands.CLUSTER_INFO); clusterFuture.addListener(new FutureListener<Map<String,String>>(){ @Override public void operationComplete( Future<Map<String,String>> future) throws Exception { if (!future.isSuccess()) { log.error("Can't execute CLUSTER_INFO for " + connection.getRedisClient().getAddr(),future.cause()); result.setFailure(future.cause()); return; } Map<String,String> params=future.getNow(); if ("fail".equals(params.get("cluster_state"))) { RedisException e=new RedisException("Failed to add master: " + partition.getMasterAddress() + " for slot ranges: "+ partition.getSlotRanges()+ ". Reason - cluster_state:fail"); log.error("cluster_state:fail for " + connection.getRedisClient().getAddr()); result.setFailure(e); return; } MasterSlaveServersConfig config=create(cfg); config.setMasterAddress(partition.getMasterAddress()); final MasterSlaveEntry e; List<RFuture<Void>> futures=new ArrayList<RFuture<Void>>(); if (config.getReadMode() == ReadMode.MASTER) { e=new SingleEntry(partition.getSlotRanges(),ClusterConnectionManager.this,config); } else { config.setSlaveAddresses(partition.getSlaveAddresses()); e=new MasterSlaveEntry(partition.getSlotRanges(),ClusterConnectionManager.this,config); List<RFuture<Void>> fs=e.initSlaveBalancer(partition.getFailedSlaveAddresses()); futures.addAll(fs); if (!partition.getSlaveAddresses().isEmpty()) { log.info("slaves: {} added for slot ranges: {}",partition.getSlaveAddresses(),partition.getSlotRanges()); if (!partition.getFailedSlaveAddresses().isEmpty()) { log.warn("slaves: {} is down for slot ranges: {}",partition.getFailedSlaveAddresses(),partition.getSlotRanges()); } } } RFuture<Void> f=e.setupMasterEntry(config.getMasterAddress().getHost(),config.getMasterAddress().getPort()); final RPromise<Void> initFuture=newPromise(); futures.add(initFuture); f.addListener(new FutureListener<Void>(){ @Override public void operationComplete( Future<Void> future) throws Exception { if (!future.isSuccess()) { log.error("Can't add master: {} for slot ranges: {}",partition.getMasterAddress(),partition.getSlotRanges()); initFuture.setFailure(future.cause()); return; } for ( Integer slot : partition.getSlots()) { addEntry(slot,e); lastPartitions.put(slot,partition); } log.info("master: {} added for slot ranges: {}",partition.getMasterAddress(),partition.getSlotRanges()); initFuture.setSuccess(null); } } ); result.setSuccess(futures); } } ); } }
- { @Override public void operationComplete( Future<Boolean> future) throws Exception { if (!future.isSuccess()) { result.tryFailure(future.cause()); return; } if (optionsCopy.isAckExpected()) { final RBlockingQueue<RemoteServiceAck> responseQueue=redisson.getBlockingQueue(responseName,getCodec()); Future<RemoteServiceAck> ackFuture=responseQueue.pollAsync(optionsCopy.getAckTimeoutInMillis(),TimeUnit.MILLISECONDS); ackFuture.addListener(new FutureListener<RemoteServiceAck>(){ @Override public void operationComplete( Future<RemoteServiceAck> future) throws Exception { if (!future.isSuccess()) { result.tryFailure(future.cause()); return; } RemoteServiceAck ack=future.getNow(); if (ack == null) { Future<RemoteServiceAck> ackFutureAttempt=tryPollAckAgainAsync(optionsCopy,responseQueue,ackName); ackFutureAttempt.addListener(new FutureListener<RemoteServiceAck>(){ @Override public void operationComplete( Future<RemoteServiceAck> future) throws Exception { if (!future.isSuccess()) { result.tryFailure(future.cause()); return; } if (future.getNow() == null) { Exception ex=new RemoteServiceAckTimeoutException("No ACK response after " + optionsCopy.getAckTimeoutInMillis() + "ms for request: "+ request); result.tryFailure(ex); return; } awaitResultAsync(optionsCopy,result,request,responseName,ackName); } } ); } else { awaitResultAsync(optionsCopy,result,request,responseName); } } } ); } else { awaitResultAsync(optionsCopy,result,request,responseName); } } } to { @Override public void operationComplete( Future<Boolean> future) throws Exception { if (!future.isSuccess()) { result.tryFailure(future.cause()); return; } if (optionsCopy.isAckExpected()) { final RBlockingQueue<RemoteServiceAck> responseQueue=redisson.getBlockingQueue(responseName,getCodec()); RFuture<RemoteServiceAck> ackFuture=responseQueue.pollAsync(optionsCopy.getAckTimeoutInMillis(),TimeUnit.MILLISECONDS); ackFuture.addListener(new FutureListener<RemoteServiceAck>(){ @Override public void operationComplete( Future<RemoteServiceAck> future) throws Exception { if (!future.isSuccess()) { result.tryFailure(future.cause()); return; } RemoteServiceAck ack=future.getNow(); if (ack == null) { RFuture<RemoteServiceAck> ackFutureAttempt=tryPollAckAgainAsync(optionsCopy,responseQueue,ackName); ackFutureAttempt.addListener(new FutureListener<RemoteServiceAck>(){ @Override public void operationComplete( Future<RemoteServiceAck> future) throws Exception { if (!future.isSuccess()) { result.tryFailure(future.cause()); return; } if (future.getNow() == null) { Exception ex=new RemoteServiceAckTimeoutException("No ACK response after " + optionsCopy.getAckTimeoutInMillis() + "ms for request: "+ request); result.tryFailure(ex); return; } awaitResultAsync(optionsCopy,result,request,responseName,ackName); } } ); } else { awaitResultAsync(optionsCopy,result,request,responseName); } } } ); } else { awaitResultAsync(optionsCopy,result,request,responseName); } } }
- { @Override public void operationComplete( Future<RedisConnection> future) throws Exception { if (future.isSuccess()) { final RedisConnection c=future.getNow(); Promise<RedisConnection> connectionFuture=connectionManager.newPromise(); connectionManager.getConnectListener().onConnect(connectionFuture,c,null,connectionManager.getConfig()); connectionFuture.addListener(new FutureListener<RedisConnection>(){ @Override public void operationComplete( Future<RedisConnection> future) throws Exception { Future<String> r=c.async(connectionManager.getConfig().getPingTimeout(),RedisCommands.PING); result.put(c,r); latch.countDown(); } } ); } else { latch.countDown(); } } } to { @Override public void operationComplete( Future<RedisConnection> future) throws Exception { if (future.isSuccess()) { final RedisConnection c=future.getNow(); RPromise<RedisConnection> connectionFuture=connectionManager.newPromise(); connectionManager.getConnectListener().onConnect(connectionFuture,c,null,connectionManager.getConfig()); connectionFuture.addListener(new FutureListener<RedisConnection>(){ @Override public void operationComplete( Future<RedisConnection> future) throws Exception { RFuture<String> r=c.async(connectionManager.getConfig().getPingTimeout(),RedisCommands.PING); result.put(c,r); latch.countDown(); } } ); } else { latch.countDown(); } } }
- Rename Variable