org.apache.bookkeeper.common.util.OrderedScheduler to org.apache.bookkeeper.common.util.OrderedExecutor
No. of Instances - 16
No. of Commits - 2
No. of Projects - {'pulsar'}
Hierarchy/Composition: -
Primitive Info: -
NameSpace: External -> External
Mapping:
- Rename Method invocation
- Add or Remove Method invocation
- dataCache.get(path,(p,executor) -> { CompletableFuture<Entry<Object,Stat>> zkFuture=new CompletableFuture<>(); try { this.zkSession.get().getData(path,watcher,(rc,path1,ctx,content,stat) -> { Executor exec=scheduledExecutor != null ? scheduledExecutor : executor; if (rc == Code.OK.intValue()) { try { T obj=deserializer.deserialize(path,content); exec.execute(() -> zkFuture.complete(new SimpleImmutableEntry<Object,Stat>(obj,stat))); } catch ( Exception e) { exec.execute(() -> zkFuture.completeExceptionally(e)); } } else if (rc == Code.NONODE.intValue()) { exec.execute(() -> zkFuture.complete(null)); } else { exec.execute(() -> zkFuture.completeExceptionally(KeeperException.create(rc))); } } ,null); } catch ( Exception e) { LOG.warn("Failed to access zkSession for {} {}",path,e.getMessage(),e); zkFuture.completeExceptionally(e); } return zkFuture; } ).thenAccept(result -> { if (result != null) { future.complete(Optional.of((Entry<T,Stat>)result)); } else { future.complete(Optional.empty()); } } ) to dataCache.get(path,(p,executor) -> { CompletableFuture<Entry<Object,Stat>> zkFuture=new CompletableFuture<>(); try { this.zkSession.get().getData(path,watcher,(rc,path1,ctx,content,stat) -> { if (rc == Code.OK.intValue()) { try { T obj=deserializer.deserialize(path,content); executor.execute(() -> zkFuture.complete(new SimpleImmutableEntry<Object,Stat>(obj,stat))); } catch ( Exception e) { executor.execute(() -> zkFuture.completeExceptionally(e)); } } else if (rc == Code.NONODE.intValue()) { executor.execute(() -> zkFuture.complete(null)); } else { executor.execute(() -> zkFuture.completeExceptionally(KeeperException.create(rc))); } } ,null); } catch ( Exception e) { LOG.warn("Failed to access zkSession for {} {}",path,e.getMessage(),e); zkFuture.completeExceptionally(e); } return zkFuture; } ).thenAccept(result -> { if (result != null) { future.complete(Optional.of((Entry<T,Stat>)result)); } else { future.complete(Optional.empty()); } } )
- executor.submit(safeRun(() -> { if (rc == Code.OK.intValue()) { try { ManagedLedgerInfo info=parseManagedLedgerInfo(readData); info=updateMLInfoTimestamp(info); callback.operationComplete(info,new ZKStat(stat)); } catch ( ParseException|InvalidProtocolBufferException e) { callback.operationFailed(new MetaStoreException(e)); } } else if (rc == Code.NONODE.intValue()) { log.info("Creating '{}{}'",prefix,ledgerName); StringCallback createcb=(rc1,path1,ctx1,name) -> { if (rc1 == Code.OK.intValue()) { ManagedLedgerInfo info=ManagedLedgerInfo.getDefaultInstance(); callback.operationComplete(info,new ZKStat()); } else { callback.operationFailed(new MetaStoreException(KeeperException.create(Code.get(rc1)))); } } ; ZkUtils.asyncCreateFullPathOptimistic(zk,prefix + ledgerName,new byte[0],Acl,CreateMode.PERSISTENT,createcb,null); } else { callback.operationFailed(new MetaStoreException(KeeperException.create(Code.get(rc)))); } } )) to executor.executeOrdered(ledgerName,safeRun(() -> { if (rc == Code.OK.intValue()) { try { ManagedLedgerInfo info=parseManagedLedgerInfo(readData); info=updateMLInfoTimestamp(info); callback.operationComplete(info,new ZKStat(stat)); } catch ( ParseException|InvalidProtocolBufferException e) { callback.operationFailed(new MetaStoreException(e)); } } else if (rc == Code.NONODE.intValue()) { log.info("Creating '{}{}'",prefix,ledgerName); StringCallback createcb=(rc1,path1,ctx1,name) -> { if (rc1 == Code.OK.intValue()) { ManagedLedgerInfo info=ManagedLedgerInfo.getDefaultInstance(); callback.operationComplete(info,new ZKStat()); } else { callback.operationFailed(new MetaStoreException(KeeperException.create(Code.get(rc1)))); } } ; ZkUtils.asyncCreateFullPathOptimistic(zk,prefix + ledgerName,new byte[0],Acl,CreateMode.PERSISTENT,createcb,null); } else { callback.operationFailed(new MetaStoreException(KeeperException.create(Code.get(rc)))); } } ))
- executor.submit(safeRun(() -> { if (log.isDebugEnabled()) { log.debug("[{}] UpdateLedgersIdsCallback.processResult rc={} newVersion={}",ledgerName,Code.get(rc),stat != null ? stat.getVersion() : "null"); } MetaStoreException status=null; if (rc == Code.BADVERSION.intValue()) { status=new BadVersionException(KeeperException.create(Code.get(rc))); callback.operationFailed(status); } else if (rc != Code.OK.intValue()) { status=new MetaStoreException(KeeperException.create(Code.get(rc))); callback.operationFailed(status); } else { callback.operationComplete(null,new ZKStat(stat1)); } } )) to executor.executeOrdered(ledgerName,safeRun(() -> { if (log.isDebugEnabled()) { log.debug("[{}] UpdateLedgersIdsCallback.processResult rc={} newVersion={}",ledgerName,Code.get(rc),stat != null ? stat.getVersion() : "null"); } MetaStoreException status=null; if (rc == Code.BADVERSION.intValue()) { status=new BadVersionException(KeeperException.create(Code.get(rc))); callback.operationFailed(status); } else if (rc != Code.OK.intValue()) { status=new MetaStoreException(KeeperException.create(Code.get(rc))); callback.operationFailed(status); } else { callback.operationComplete(null,new ZKStat(stat1)); } } ))
- executor.submit(safeRun(() -> { if (log.isDebugEnabled()) { log.debug("[{}] getConsumers complete rc={} children={}",ledgerName,Code.get(rc),children); } if (rc != Code.OK.intValue()) { callback.operationFailed(new MetaStoreException(KeeperException.create(Code.get(rc)))); return; } if (log.isDebugEnabled()) { log.debug("[{}] Get childrend completed version={}",ledgerName,stat.getVersion()); } callback.operationComplete(children,new ZKStat(stat)); } )) to executor.executeOrdered(ledgerName,safeRun(() -> { if (log.isDebugEnabled()) { log.debug("[{}] getConsumers complete rc={} children={}",ledgerName,Code.get(rc),children); } if (rc != Code.OK.intValue()) { callback.operationFailed(new MetaStoreException(KeeperException.create(Code.get(rc)))); return; } if (log.isDebugEnabled()) { log.debug("[{}] Get childrend completed version={}",ledgerName,stat.getVersion()); } callback.operationComplete(children,new ZKStat(stat)); } ))
- executor.submit(safeRun(() -> { if (rc != Code.OK.intValue()) { callback.operationFailed(new MetaStoreException(KeeperException.create(Code.get(rc)))); } else { try { ManagedCursorInfo info=parseManagedCursorInfo(data); callback.operationComplete(info,new ZKStat(stat)); } catch ( ParseException|InvalidProtocolBufferException e) { callback.operationFailed(new MetaStoreException(e)); } } } )) to executor.executeOrdered(ledgerName,safeRun(() -> { if (rc != Code.OK.intValue()) { callback.operationFailed(new MetaStoreException(KeeperException.create(Code.get(rc)))); } else { try { ManagedCursorInfo info=parseManagedCursorInfo(data); callback.operationComplete(info,new ZKStat(stat)); } catch ( ParseException|InvalidProtocolBufferException e) { callback.operationFailed(new MetaStoreException(e)); } } } ))
- executor.submit(safeRun(() -> { if (rc != Code.OK.intValue()) { log.warn("[{}] Error creating cosumer {} node on meta-data store with {}: ",ledgerName,cursorName,info,Code.get(rc)); callback.operationFailed(new MetaStoreException(KeeperException.create(Code.get(rc)))); } else { if (log.isDebugEnabled()) { log.debug("[{}] Created consumer {} on meta-data store with {}",ledgerName,cursorName,info); } callback.operationComplete(null,new ZKStat()); } } )) to executor.executeOrdered(ledgerName,safeRun(() -> { if (rc != Code.OK.intValue()) { log.warn("[{}] Error creating cosumer {} node on meta-data store with {}: ",ledgerName,cursorName,info,Code.get(rc)); callback.operationFailed(new MetaStoreException(KeeperException.create(Code.get(rc)))); } else { if (log.isDebugEnabled()) { log.debug("[{}] Created consumer {} on meta-data store with {}",ledgerName,cursorName,info); } callback.operationComplete(null,new ZKStat()); } } ))
- executor.submit(safeRun(() -> { if (rc == Code.BADVERSION.intValue()) { callback.operationFailed(new BadVersionException(KeeperException.create(Code.get(rc)))); } else if (rc != Code.OK.intValue()) { callback.operationFailed(new MetaStoreException(KeeperException.create(Code.get(rc)))); } else { callback.operationComplete(null,new ZKStat(stat1)); } } )) to executor.executeOrdered(ledgerName,safeRun(() -> { if (rc == Code.BADVERSION.intValue()) { callback.operationFailed(new BadVersionException(KeeperException.create(Code.get(rc)))); } else if (rc != Code.OK.intValue()) { callback.operationFailed(new MetaStoreException(KeeperException.create(Code.get(rc)))); } else { callback.operationComplete(null,new ZKStat(stat1)); } } ))
- executor.submit(safeRun(() -> { if (log.isDebugEnabled()) { log.debug("[{}] [{}] zk delete done. rc={}",ledgerName,consumerName,Code.get(rc)); } if (rc == Code.OK.intValue()) { callback.operationComplete(null,null); } else { callback.operationFailed(new MetaStoreException(KeeperException.create(Code.get(rc)))); } } )) to executor.executeOrdered(ledgerName,safeRun(() -> { if (log.isDebugEnabled()) { log.debug("[{}] [{}] zk delete done. rc={}",ledgerName,consumerName,Code.get(rc)); } if (rc == Code.OK.intValue()) { callback.operationComplete(null,null); } else { callback.operationFailed(new MetaStoreException(KeeperException.create(Code.get(rc)))); } } ))
- executor.submit(safeRun(() -> { if (log.isDebugEnabled()) { log.debug("[{}] zk delete done. rc={}",ledgerName,Code.get(rc)); } if (rc == Code.OK.intValue()) { callback.operationComplete(null,null); } else { callback.operationFailed(new MetaStoreException(KeeperException.create(Code.get(rc)))); } } )) to executor.executeOrdered(ledgerName,safeRun(() -> { if (log.isDebugEnabled()) { log.debug("[{}] zk delete done. rc={}",ledgerName,Code.get(rc)); } if (rc == Code.OK.intValue()) { callback.operationComplete(null,null); } else { callback.operationFailed(new MetaStoreException(KeeperException.create(Code.get(rc)))); } } ))
- OrderedScheduler.newSchedulerBuilder().numThreads(pulsar.getConfiguration().getNumWorkerThreadsForNonPersistentTopic()).name("broker-np-topic-workers").build() to OrderedScheduler.newSchedulerBuilder().numThreads(pulsar.getConfiguration().getNumWorkerThreadsForNonPersistentTopic()).name("broker-topic-workers").build()