Package org.apache.storm.utils
Class Utils
java.lang.Object
org.apache.storm.utils.Utils
- 
Nested Class SummaryNested ClassesModifier and TypeClassDescriptionstatic classA thread that can answer if it is sleeping in the case of simulated time.static class
- 
Field SummaryFields
- 
Constructor SummaryConstructors
- 
Method SummaryModifier and TypeMethodDescriptionstatic voidaddShutdownHookWithDelayedForceKill(Runnable func, int numSecs) Adds the user supplied function as a shutdown hook for cleanup.static voidAdds the user supplied function as a shutdown hook for cleanup.static StormTopologyaddVersions(StormTopology topology) Add version information to the given topology.static Utils.SmartThreadConvenience method used when only the function is given.static Utils.SmartThreadasyncLoop(Callable afn, boolean isDaemon, Thread.UncaughtExceptionHandler eh, int priority, boolean isFactory, boolean startImmediately, String threadName) Creates a thread that calls the given code repeatedly, sleeping for an interval of seconds equal to the return value of the previous call.static Utils.SmartThreadasyncLoop(Callable afn, String threadName, Thread.UncaughtExceptionHandler eh) Convenience method used when only the function and name suffix are given.static longstatic longbitXorVals(List<Long> coll) static booleancheckDirExists(String dir) static booleancheckFileExists(String path) static <V> ArrayList<V>convertToArray(Map<Integer, V> srcMap, int start) static <T> Tdeserialize(byte[] serialized, Class<T> clazz) static <T> TdeserializeFromString(String str, Class<T> clazz) Deserialize an object stored in a string.static booleanexceptionCauseIsInstanceOf(Class klass, Throwable throwable) Checks if a throwable is an instance of a particular class.static voidexitProcess(int val, String msg) findAndReadConfigFile(String name) findAndReadConfigFile(String name, boolean mustExist) findComponentCycles(StormTopology topology, String topoId) Find and return components cycles in the topology graph when starting from spout.static <T> TfindOne(IPredicate<T> pred, Collection<T> coll) Find the first item of coll for which pred.test(...) returns true.static <T,U> T findOne(IPredicate<T> pred, Map<U, T> map) findResources(String name) static voidforceDelete(String path) Deletes a file or directory and its contents if it exists.protected voidforceDeleteImpl(String path) fromCompressedJsonConf(byte[] serialized) static <S,T> T static NavigableMap<String,IVersionInfo> getAlternativeVersionsMap(Map<String, Object> conf) Get a mapping of the configured supported versions of storm to their actual versions.static intShortcut to callinggetAvailablePort(int)with 0 as the preferred port.static intgetAvailablePort(int preferredPort) Gets an available port.static ClientBlobStoregetClientBlobStore(Map<String, Object> conf) static <T> TgetCompatibleVersion(NavigableMap<SimpleVersion, T> versionedMap, SimpleVersion desiredVersion, String what, T defaultValue) static ComponentCommongetComponentCommon(StormTopology topology, String id) static ObjectgetConfiguredClass(Map<String, Object> conf, Object configKey) Return a new instance of a pluggable specified in the conf.static NavigableMap<SimpleVersion,List<String>> Get a map of version to classpath from the conf Config.SUPERVISOR_WORKER_VERSION_CLASSPATH_MAPstatic NavigableMap<SimpleVersion,String> Get a map of version to worker log writer from the conf Config.SUPERVISOR_WORKER_VERSION_LOGWRITER_MAPstatic NavigableMap<SimpleVersion,String> Get a map of version to worker main from the conf Config.SUPERVISOR_WORKER_VERSION_MAIN_MAPstatic GlobalStreamIdgetGlobalStreamId(String componentId, String streamId) static Objectstatic org.apache.storm.shade.org.apache.zookeeper.data.ACLgetSuperUserAcl(Map<String, Object> conf) Get the ACL for nimbus/supervisor.static StringgetTopologyId(String name, Nimbus.Iface client) static TopologyInfostatic List<org.apache.storm.shade.org.apache.zookeeper.data.ACL>getWorkerACL(Map<String, Object> conf) Get the ZK ACLs that a worker should use when writing to ZK.static byte[]gunzip(byte[] data) static byte[]gzip(byte[] data) static voidstatic voidhandleUncaughtException(Throwable t, Set<Class<?>> allowedExceptions, boolean worker) Handles uncaught exceptions.static voidstatic Stringhostname()Gets the storm.local.hostname value, or tries to figure out the local hostname if it is not set in the config.protected StringintegerDivided(int sum, int numPieces) static booleanisLocalhostAddress(String address) static booleanstatic booleanisSystemId(String id) static booleanisValidConf(Map<String, Object> topoConfIn) static booleanisValidKey(String key) Validates blob key.static booleanIs the cluster configured to interact with ZooKeeper in a secure way? This only works when called from within Nimbus or a Supervisor process.static booleanIs the topology configured to have ZooKeeper authentication.static <T> TjavaDeserialize(byte[] serialized, Class<T> clazz) static byte[]javaSerialize(Object obj) static <T> Stringstatic Stringprotected Stringstatic Utils.UptimeComputerstatic Stringstatic <K,V> Map<K, V> static Runnablestatic doublenullToZero(Double v) static <V> VOR(V a, V b) a or b the first one that is not null.static DoubleparseJvmHeapMemByChildOpts(List<String> options, Double defaultValue) parses the arguments to extract jvm heap memory size in MB.static org.apache.storm.shade.org.apache.zookeeper.data.IdpartitionFixed(int maxNumChunks, Collection<T> coll) Fills up chunks out of a collection (given a maximum amount of chunks).static StringGet process PID.static voidreadAndLogStream(String prefix, InputStream in) static ObjectreadYamlFile(String yamlFile) redactValue(Map<String, Object> m, String key) Creates a new map with a string value in the map replaced with an equivalently-lengthed string of '#'.static voidreverseMap(List<List<Object>> listSeq) "[[:a 1] [:b 1] [:c 2]} -> {1 [:a :b] 2 :c}" Reverses an assoc-list style Map like reverseMap(Map...)reverseMap(Map<K, V> map) "{:a 1 :b 1 :c 2} -> {1 [:a :b] 2 :c}".static longstatic byte[]static StringserializeToString(Object obj) Serialize an object using the configured serialization and then base64 encode it into a string.static voidstatic UtilssetInstance(Utils u) Provide an instance of this class for delegates to use.static voidstatic voidstatic voidsleep(long millis) static voidsleepNoSimulation(long millis) static StringGets some information, including stack trace, for a running thread.static <T> TthriftDeserialize(Class<T> c, byte[] b) static <T> TthriftDeserialize(Class<T> c, byte[] b, int offset, int length) static byte[]thriftSerialize(org.apache.storm.thrift.TBase t) static byte[]toByteArray(ByteBuffer buffer) static byte[]toCompressedJsonConf(Map<String, Object> topoConf) static inttoPositive(int number) A cheap way to deterministically convert a number to a positive value.static <T extends Throwable>
 voidunwrapAndThrow(Class<T> klass, Throwable t) static <T extends Throwable>
 Tstatic StringURL decode the given string using the UTF-8 charset.static StringURL encode the given string using the UTF-8 charset.static Stringuuid()static voidvalidateCycleFree(StormTopology topology, String name) Validate that the topology is cycle free.static voidvalidateTopologyBlobStoreMap(Map<String, Object> topoConf) Validate topology blobstore map.static voidvalidateTopologyBlobStoreMap(Map<String, Object> topoConf, BlobStore blobStore) Validate topology blobstore map.static voidvalidateTopologyBlobStoreMap(Map<String, Object> topoConf, NimbusBlobStore client) Validate topology blobstore map.static voidvalidateTopologyName(String name) Validates topology name.static RuntimeExceptionstatic doublezeroIfNaNOrInf(double x) 
- 
Field Details- 
LOGpublic static final org.slf4j.Logger LOG
- 
DEFAULT_STREAM_ID- See Also:
 
- 
BLOB_KEY_PATTERN
 
- 
- 
Constructor Details- 
Utilspublic Utils()
 
- 
- 
Method Details- 
setInstanceProvide an instance of this class for delegates to use. To mock out delegated methods, provide an instance of a subclass that overrides the implementation of the delegated method.- Parameters:
- u- a Utils instance
- Returns:
- the previously set instance
 
- 
setClassLoaderForJavaDeSerialize
- 
resetClassLoaderForJavaDeSerializepublic static void resetClassLoaderForJavaDeSerialize()
- 
findResources
- 
findAndReadConfigFile
- 
findAndReadConfigFile
- 
readDefaultConfig
- 
urlEncodeUtf8URL encode the given string using the UTF-8 charset. Once Storm is baselined to Java 11, we can use URLEncoder.encode(String, Charset) instead, which obsoletes this method.
- 
urlDecodeUtf8URL decode the given string using the UTF-8 charset. Once Storm is baselined to Java 11, we can use URLDecoder.decode(String, Charset) instead, which obsoletes this method.
- 
readCommandLineOpts
- 
readStormConfig
- 
bitXorVals
- 
bitXor
- 
addShutdownHookWithForceKillIn1SecAdds the user supplied function as a shutdown hook for cleanup. Also adds a function that sleeps for a second and then halts the runtime to avoid any zombie process in case cleanup function hangs.
- 
addShutdownHookWithDelayedForceKillAdds the user supplied function as a shutdown hook for cleanup. Also adds a function that sleeps for numSecs and then halts the runtime to avoid any zombie process in case cleanup function hangs.
- 
isSystemId
- 
asyncLooppublic static Utils.SmartThread asyncLoop(Callable afn, boolean isDaemon, Thread.UncaughtExceptionHandler eh, int priority, boolean isFactory, boolean startImmediately, String threadName) Creates a thread that calls the given code repeatedly, sleeping for an interval of seconds equal to the return value of the previous call.The given afn may be a callable that returns the number of seconds to sleep, or it may be a Callable that returns another Callable that in turn returns the number of seconds to sleep. In the latter case isFactory. - Parameters:
- afn- the code to call on each iteration
- isDaemon- whether the new thread should be a daemon thread
- eh- code to call when afn throws an exception
- priority- the new thread's priority
- isFactory- whether afn returns a callable instead of sleep seconds
- startImmediately- whether to start the thread before returning
- threadName- a suffix to be appended to the thread name
- Returns:
- the newly created thread
- See Also:
 
- 
asyncLooppublic static Utils.SmartThread asyncLoop(Callable afn, String threadName, Thread.UncaughtExceptionHandler eh) Convenience method used when only the function and name suffix are given.- Parameters:
- afn- the code to call on each iteration
- threadName- a suffix to be appended to the thread name
- Returns:
- the newly created thread
- See Also:
 
- 
asyncLoopConvenience method used when only the function is given.- Parameters:
- afn- the code to call on each iteration
- Returns:
- the newly created thread
 
- 
exceptionCauseIsInstanceOfChecks if a throwable is an instance of a particular class.- Parameters:
- klass- The class you're expecting
- throwable- The throwable you expect to be an instance of klass
- Returns:
- true if throwable is instance of klass, false otherwise.
 
- 
unwrapTo
- 
unwrapAndThrow- Throws:
- T extends Throwable
 
- 
wrapInRuntime
- 
secureRandomLongpublic static long secureRandomLong()
- 
hostnameGets the storm.local.hostname value, or tries to figure out the local hostname if it is not set in the config.- Returns:
- a string representation of the hostname.
- Throws:
- UnknownHostException
 
- 
localHostname- Throws:
- UnknownHostException
 
- 
exitProcess
- 
uuid
- 
javaSerialize
- 
javaDeserialize
- 
get
- 
zeroIfNaNOrInfpublic static double zeroIfNaNOrInf(double x) 
- 
join
- 
parseZkId
- 
getSuperUserAclpublic static org.apache.storm.shade.org.apache.zookeeper.data.ACL getSuperUserAcl(Map<String, Object> conf) Get the ACL for nimbus/supervisor. The Super User ACL. This assumes that security is enabled.- Parameters:
- conf- the config to get the super User ACL from
- Returns:
- the super user ACL.
 
- 
getWorkerACLpublic static List<org.apache.storm.shade.org.apache.zookeeper.data.ACL> getWorkerACL(Map<String, Object> conf) Get the ZK ACLs that a worker should use when writing to ZK.- Parameters:
- conf- the config for the topology.
- Returns:
- the ACLs
 
- 
isZkAuthenticationConfiguredTopologyIs the topology configured to have ZooKeeper authentication.- Parameters:
- conf- the topology configuration
- Returns:
- true if ZK is configured else false
 
- 
handleUncaughtExceptionpublic static void handleUncaughtException(Throwable t, Set<Class<?>> allowedExceptions, boolean worker) Handles uncaught exceptions.- Parameters:
- worker- true if this is for handling worker exceptions
 
- 
handleUncaughtException
- 
handleWorkerUncaughtException
- 
thriftSerializepublic static byte[] thriftSerialize(org.apache.storm.thrift.TBase t) 
- 
thriftDeserialize
- 
thriftDeserialize
- 
sleepNoSimulationpublic static void sleepNoSimulation(long millis) 
- 
sleeppublic static void sleep(long millis) 
- 
makeUptimeComputer
- 
reverseMap"{:a 1 :b 1 :c 2} -> {1 [:a :b] 2 :c}".Example usage in java: Map<Integer, String> tasks; Map<String, List<Integer>> componentTasks = Utils.reverse_map(tasks);The order of he resulting list values depends on the ordering properties of the Map passed in. The caller is responsible for passing an ordered map if they expect the result to be consistently ordered as well. - Parameters:
- map- to reverse
- Returns:
- a reversed map
 
- 
reverseMap"[[:a 1] [:b 1] [:c 2]} -> {1 [:a :b] 2 :c}" Reverses an assoc-list style Map like reverseMap(Map...)- Parameters:
- listSeq- to reverse
- Returns:
- a reversed map
 
- 
isOnWindowspublic static boolean isOnWindows()
- 
checkFileExists
- 
forceDeleteDeletes a file or directory and its contents if it exists. Does not complain if the input is null or does not exist.- Parameters:
- path- the path to the file or directory
- Throws:
- IOException
 
- 
serialize
- 
deserialize
- 
serializeToStringSerialize an object using the configured serialization and then base64 encode it into a string.- Parameters:
- obj- the object to encode
- Returns:
- a string with the encoded object in it.
 
- 
deserializeFromStringDeserialize an object stored in a string. The String is assumed to be a base64 encoded string containing the bytes to actually deserialize.- Parameters:
- str- the encoded string.
- clazz- the thrift class we are expecting.
- Returns:
- the decoded object
 
- 
toByteArray
- 
mkSuicideFn
- 
readAndLogStream
- 
getComponentCommon
- 
tuple
- 
gzippublic static byte[] gzip(byte[] data) 
- 
gunzippublic static byte[] gunzip(byte[] data) 
- 
getRepeat
- 
getGlobalStreamId
- 
getSetComponentObject
- 
toPositivepublic static int toPositive(int number) A cheap way to deterministically convert a number to a positive value. When the input is positive, the original value is returned. When the input number is negative, the returned positive value is the original value bit AND against Integer.MAX_VALUE(0x7fffffff) which is not its absolutely value.- Parameters:
- number- a given number
- Returns:
- a positive number.
 
- 
processPidGet process PID.- Returns:
- the pid of this JVM, because Java doesn't provide a real way to do this.
 
- 
fromCompressedJsonConf
- 
redactValueCreates a new map with a string value in the map replaced with an equivalently-lengthed string of '#'. (If the object is not a string to string will be called on it and replaced)- Parameters:
- m- The map that a value will be redacted from
- key- The key pointing to the value to be redacted
- Returns:
- a new map with the value redacted. The original map will not be modified.
 
- 
createDefaultUncaughtExceptionHandler
- 
createWorkerUncaughtExceptionHandler
- 
setupDefaultUncaughtExceptionHandlerpublic static void setupDefaultUncaughtExceptionHandler()
- 
setupWorkerUncaughtExceptionHandlerpublic static void setupWorkerUncaughtExceptionHandler()
- 
parseJvmHeapMemByChildOptsparses the arguments to extract jvm heap memory size in MB.- Returns:
- the value of the JVM heap memory setting (in MB) in a java command.
 
- 
getClientBlobStore
- 
isValidConf
- 
getTopologyInfo
- 
getTopologyId
- 
validateTopologyBlobStoreMappublic static void validateTopologyBlobStoreMap(Map<String, Object> topoConf) throws InvalidTopologyException, AuthorizationExceptionValidate topology blobstore map.- Parameters:
- topoConf- Topology configuration
- Throws:
- InvalidTopologyException
- AuthorizationException
 
- 
validateTopologyBlobStoreMappublic static void validateTopologyBlobStoreMap(Map<String, Object> topoConf, NimbusBlobStore client) throws InvalidTopologyException, AuthorizationExceptionValidate topology blobstore map.- Parameters:
- topoConf- Topology configuration
- client- The NimbusBlobStore client. It must call prepare() before being used here.
- Throws:
- InvalidTopologyException
- AuthorizationException
 
- 
validateTopologyBlobStoreMappublic static void validateTopologyBlobStoreMap(Map<String, Object> topoConf, BlobStore blobStore) throws InvalidTopologyException, AuthorizationExceptionValidate topology blobstore map.
- 
threadDumpGets some information, including stack trace, for a running thread.- Returns:
- A human-readable string of the dump.
 
- 
checkDirExists
- 
getConfiguredClassReturn a new instance of a pluggable specified in the conf.- Parameters:
- conf- The conf to read from.
- configKey- The key pointing to the pluggable class
- Returns:
- an instance of the class or null if it is not specified.
 
- 
isZkAuthenticationConfiguredStormServerIs the cluster configured to interact with ZooKeeper in a secure way? This only works when called from within Nimbus or a Supervisor process.- Parameters:
- conf- the storm configuration, not the topology configuration
- Returns:
- true if it is configured else false.
 
- 
toCompressedJsonConf
- 
nullToZero
- 
ORpublic static <V> V OR(V a, V b) a or b the first one that is not null.- Parameters:
- a- something
- b- something else
- Returns:
- a or b the first one that is not null
 
- 
integerDivided
- 
partitionFixedFills up chunks out of a collection (given a maximum amount of chunks).i.e. partitionFixed(5, [1,2,3]) -> [[1,2,3]] partitionFixed(5, [1..9]) -> [[1,2], [3,4], [5,6], [7,8], [9]] partitionFixed(3, [1..10]) -> [[1,2,3,4], [5,6,7], [8,9,10]] - Parameters:
- maxNumChunks- the maximum number of chunks to return
- coll- the collection to be chunked up
- Returns:
- a list of the chunks, which are themselves lists.
 
- 
readYamlFile
- 
getAvailablePortpublic static int getAvailablePort(int preferredPort) Gets an available port. Consider if it is possible to pass port 0 to the server instead of using this method, since there is no guarantee that the port returned by this method will remain free.- Returns:
- The preferred port if available, or a random available port
 
- 
getAvailablePortpublic static int getAvailablePort()Shortcut to callinggetAvailablePort(int)with 0 as the preferred port.- Returns:
- A random available port
 
- 
findOneFind the first item of coll for which pred.test(...) returns true.- Parameters:
- pred- The IPredicate to test for
- coll- The Collection of items to search through.
- Returns:
- The first matching value in coll, or null if nothing matches.
 
- 
findOne
- 
parseJson
- 
memoizedLocalHostname- Throws:
- UnknownHostException
 
- 
addVersionsAdd version information to the given topology.- Parameters:
- topology- the topology being submitted (MIGHT BE MODIFIED)
- Returns:
- topology
 
- 
getConfiguredClasspathVersionspublic static NavigableMap<SimpleVersion,List<String>> getConfiguredClasspathVersions(Map<String, Object> conf, List<String> currentClassPath) Get a map of version to classpath from the conf Config.SUPERVISOR_WORKER_VERSION_CLASSPATH_MAP- Parameters:
- conf- what to read it out of
- currentClassPath- the current classpath for this version of storm (not included in the conf, but returned by this)
- Returns:
- the map
 
- 
getAlternativeVersionsMapGet a mapping of the configured supported versions of storm to their actual versions.- Parameters:
- conf- what to read the configuration out of.
- Returns:
- the map.
 
- 
getConfiguredWorkerMainVersionspublic static NavigableMap<SimpleVersion,String> getConfiguredWorkerMainVersions(Map<String, Object> conf) Get a map of version to worker main from the conf Config.SUPERVISOR_WORKER_VERSION_MAIN_MAP- Parameters:
- conf- what to read it out of
- Returns:
- the map
 
- 
getConfiguredWorkerLogWriterVersionspublic static NavigableMap<SimpleVersion,String> getConfiguredWorkerLogWriterVersions(Map<String, Object> conf) Get a map of version to worker log writer from the conf Config.SUPERVISOR_WORKER_VERSION_LOGWRITER_MAP- Parameters:
- conf- what to read it out of
- Returns:
- the map
 
- 
getConfigFromClasspathpublic static Map<String,Object> getConfigFromClasspath(List<String> cp, Map<String, Object> conf) throws IOException- Throws:
- IOException
 
- 
isLocalhostAddress
- 
merge
- 
convertToArray
- 
forceDeleteImpl- Throws:
- IOException
 
- 
makeUptimeComputerImpl
- 
localHostnameImpl- Throws:
- UnknownHostException
 
- 
hostnameImpl- Throws:
- UnknownHostException
 
- 
isValidKeyValidates blob key.- Parameters:
- key- Key for the blob.
 
- 
validateTopologyNameValidates topology name.- Parameters:
- name- the topology name
- Throws:
- IllegalArgumentException- if the topology name is not valid
 
- 
findComponentCyclesFind and return components cycles in the topology graph when starting from spout. Return a list of cycles. Each cycle may consist of one or more components. Components that cannot be reached from any of the spouts are ignored.- Returns:
- a List of cycles. Each cycle has a list of component names.
 
- 
validateCycleFreepublic static void validateCycleFree(StormTopology topology, String name) throws InvalidTopologyException Validate that the topology is cycle free. If not, then throw an InvalidTopologyException describing the cycle(s).- Parameters:
- topology- StormTopology instance to examine.
- name- Name of the topology, used in exception error message.
- Throws:
- InvalidTopologyException- if there are cycles, with message describing the cycles encountered.
 
 
-