org.redisson.api.RTopic<java.lang.Object> to org.redisson.api.RTopic
No. of Instances - 8
No. of Commits - 1
No. of Projects - {'redisson'}
Hierarchy/Composition: -
Primitive Info: -
NameSpace: Internal -> Internal
Mapping:
- Cascading Type Change (Similar)
- Update Anonymous class
- { @Override public void operationComplete( Future<Void> future) throws Exception { RedissonBatch publishBatch=new RedissonBatch(null,commandExecutor.getConnectionManager(),BatchOptions.defaults()); for ( final Entry<HashKey,HashValue> entry : hashes.entrySet()) { String disabledKeysName=RedissonObject.suffixName(entry.getKey().getName(),RedissonLocalCachedMap.DISABLED_KEYS_SUFFIX); RMultimapCacheAsync<LocalCachedMapDisabledKey,String> multimap=publishBatch.getListMultimapCache(disabledKeysName,entry.getKey().getCodec()); LocalCachedMapDisabledKey localCacheKey=new LocalCachedMapDisabledKey(requestId,options.getResponseTimeout()); multimap.removeAllAsync(localCacheKey); RTopicAsync<Object> topic=publishBatch.getTopic(RedissonObject.suffixName(entry.getKey().getName(),RedissonLocalCachedMap.TOPIC_SUFFIX),LocalCachedMessageCodec.INSTANCE); RFuture<Long> publishFuture=topic.publishAsync(new LocalCachedMapDisable(requestId,entry.getValue().getKeyIds().toArray(new byte[entry.getValue().getKeyIds().size()][]),options.getResponseTimeout())); publishFuture.addListener(new FutureListener<Long>(){ @Override public void operationComplete( Future<Long> future) throws Exception { if (!future.isSuccess()) { return; } int receivers=future.getNow().intValue(); AtomicInteger counter=entry.getValue().getCounter(); if (counter.addAndGet(receivers) == 0) { listener.decCounter(); } } } ); } RFuture<BatchResult<?>> publishFuture=publishBatch.executeAsync(); publishFuture.addListener(new FutureListener<BatchResult<?>>(){ @Override public void operationComplete( Future<BatchResult<?>> future) throws Exception { result.addListener(new FutureListener<Map<HashKey,HashValue>>(){ @Override public void operationComplete( Future<Map<HashKey,HashValue>> future) throws Exception { for ( RTopic<Object> topic : topics) { topic.removeAllListeners(); } } } ); if (!future.isSuccess()) { result.tryFailure(future.cause()); return; } commandExecutor.getConnectionManager().newTimeout(new TimerTask(){ @Override public void run( Timeout timeout) throws Exception { result.tryFailure(new TransactionTimeoutException("Unable to execute transaction within " + options.getResponseTimeout() + "ms")); } } ,options.getResponseTimeout(),TimeUnit.MILLISECONDS); } } ); } } to { @Override public void operationComplete( Future<Void> future) throws Exception { RedissonBatch publishBatch=new RedissonBatch(null,commandExecutor.getConnectionManager(),BatchOptions.defaults()); for ( final Entry<HashKey,HashValue> entry : hashes.entrySet()) { String disabledKeysName=RedissonObject.suffixName(entry.getKey().getName(),RedissonLocalCachedMap.DISABLED_KEYS_SUFFIX); RMultimapCacheAsync<LocalCachedMapDisabledKey,String> multimap=publishBatch.getListMultimapCache(disabledKeysName,entry.getKey().getCodec()); LocalCachedMapDisabledKey localCacheKey=new LocalCachedMapDisabledKey(requestId,options.getResponseTimeout()); multimap.removeAllAsync(localCacheKey); RTopicAsync topic=publishBatch.getTopic(RedissonObject.suffixName(entry.getKey().getName(),RedissonLocalCachedMap.TOPIC_SUFFIX),LocalCachedMessageCodec.INSTANCE); RFuture<Long> publishFuture=topic.publishAsync(new LocalCachedMapDisable(requestId,entry.getValue().getKeyIds().toArray(new byte[entry.getValue().getKeyIds().size()][]),options.getResponseTimeout())); publishFuture.addListener(new FutureListener<Long>(){ @Override public void operationComplete( Future<Long> future) throws Exception { if (!future.isSuccess()) { return; } int receivers=future.getNow().intValue(); AtomicInteger counter=entry.getValue().getCounter(); if (counter.addAndGet(receivers) == 0) { listener.decCounter(); } } } ); } RFuture<BatchResult<?>> publishFuture=publishBatch.executeAsync(); publishFuture.addListener(new FutureListener<BatchResult<?>>(){ @Override public void operationComplete( Future<BatchResult<?>> future) throws Exception { result.addListener(new FutureListener<Map<HashKey,HashValue>>(){ @Override public void operationComplete( Future<Map<HashKey,HashValue>> future) throws Exception { for ( RTopic topic : topics) { topic.removeAllListeners(); } } } ); if (!future.isSuccess()) { result.tryFailure(future.cause()); return; } commandExecutor.getConnectionManager().newTimeout(new TimerTask(){ @Override public void run( Timeout timeout) throws Exception { result.tryFailure(new TransactionTimeoutException("Unable to execute transaction within " + options.getResponseTimeout() + "ms")); } } ,options.getResponseTimeout(),TimeUnit.MILLISECONDS); } } ); } }
- Cascading Type Change (Different)
- Add or Remove Method invocation
- invalidationTopic.addListener(new MessageListener<Object>(){ @Override public void onMessage( CharSequence channel, Object msg){ if (msg instanceof LocalCachedMapDisable) { LocalCachedMapDisable m=(LocalCachedMapDisable)msg; String requestId=m.getRequestId(); Set<CacheKey> keysToDisable=new HashSet<CacheKey>(); for ( byte[] keyHash : ((LocalCachedMapDisable)msg).getKeyHashes()) { CacheKey key=new CacheKey(keyHash); keysToDisable.add(key); } disableKeys(requestId,keysToDisable,m.getTimeout()); RedissonTopic topic=new RedissonTopic(LocalCachedMessageCodec.INSTANCE,commandExecutor,RedissonObject.suffixName(name,requestId + DISABLED_ACK_SUFFIX)); topic.publishAsync(new LocalCachedMapDisableAck()); } if (msg instanceof LocalCachedMapEnable) { LocalCachedMapEnable m=(LocalCachedMapEnable)msg; for ( byte[] keyHash : m.getKeyHashes()) { CacheKey key=new CacheKey(keyHash); disabledKeys.remove(key,m.getRequestId()); } } if (msg instanceof LocalCachedMapClear) { cache.clear(); } if (msg instanceof LocalCachedMapInvalidate) { LocalCachedMapInvalidate invalidateMsg=(LocalCachedMapInvalidate)msg; if (!Arrays.equals(invalidateMsg.getExcludedId(),instanceId)) { for ( byte[] keyHash : invalidateMsg.getKeyHashes()) { CacheKey key=new CacheKey(keyHash); cache.remove(key); } } } if (msg instanceof LocalCachedMapUpdate) { LocalCachedMapUpdate updateMsg=(LocalCachedMapUpdate)msg; for ( LocalCachedMapUpdate.Entry entry : updateMsg.getEntries()) { ByteBuf keyBuf=Unpooled.wrappedBuffer(entry.getKey()); ByteBuf valueBuf=Unpooled.wrappedBuffer(entry.getValue()); try { updateCache(keyBuf,valueBuf); } catch ( IOException e) { log.error("Can't decode map entry",e); } finally { keyBuf.release(); valueBuf.release(); } } } if (options.getReconnectionStrategy() == ReconnectionStrategy.LOAD) { lastInvalidate=System.currentTimeMillis(); } } } ) to invalidationTopic.addListener(Object.class,new MessageListener<Object>(){ @Override public void onMessage( CharSequence channel, Object msg){ if (msg instanceof LocalCachedMapDisable) { LocalCachedMapDisable m=(LocalCachedMapDisable)msg; String requestId=m.getRequestId(); Set<CacheKey> keysToDisable=new HashSet<CacheKey>(); for ( byte[] keyHash : ((LocalCachedMapDisable)msg).getKeyHashes()) { CacheKey key=new CacheKey(keyHash); keysToDisable.add(key); } disableKeys(requestId,keysToDisable,m.getTimeout()); RedissonTopic topic=new RedissonTopic(LocalCachedMessageCodec.INSTANCE,commandExecutor,RedissonObject.suffixName(name,requestId + DISABLED_ACK_SUFFIX)); topic.publishAsync(new LocalCachedMapDisableAck()); } if (msg instanceof LocalCachedMapEnable) { LocalCachedMapEnable m=(LocalCachedMapEnable)msg; for ( byte[] keyHash : m.getKeyHashes()) { CacheKey key=new CacheKey(keyHash); disabledKeys.remove(key,m.getRequestId()); } } if (msg instanceof LocalCachedMapClear) { cache.clear(); } if (msg instanceof LocalCachedMapInvalidate) { LocalCachedMapInvalidate invalidateMsg=(LocalCachedMapInvalidate)msg; if (!Arrays.equals(invalidateMsg.getExcludedId(),instanceId)) { for ( byte[] keyHash : invalidateMsg.getKeyHashes()) { CacheKey key=new CacheKey(keyHash); cache.remove(key); } } } if (msg instanceof LocalCachedMapUpdate) { LocalCachedMapUpdate updateMsg=(LocalCachedMapUpdate)msg; for ( LocalCachedMapUpdate.Entry entry : updateMsg.getEntries()) { ByteBuf keyBuf=Unpooled.wrappedBuffer(entry.getKey()); ByteBuf valueBuf=Unpooled.wrappedBuffer(entry.getValue()); try { updateCache(keyBuf,valueBuf); } catch ( IOException e) { log.error("Can't decode map entry",e); } finally { keyBuf.release(); valueBuf.release(); } } } if (options.getReconnectionStrategy() == ReconnectionStrategy.LOAD) { lastInvalidate=System.currentTimeMillis(); } } } )