Package org.apache.storm.utils
Class ServerUtils
java.lang.Object
org.apache.storm.utils.ServerUtils
- 
Field SummaryFieldsModifier and TypeFieldDescriptionstatic final booleanstatic final org.slf4j.Loggerstatic final intstatic final int
- 
Constructor SummaryConstructors
- 
Method SummaryModifier and TypeMethodDescriptionstatic booleanFind if all processes for the user on workId are dead.static booleancanUserReadBlob(ReadableBlobMeta meta, String user, Map<String, Object> conf) static StringcontainerFilePath(String dir) static StringReturns the value of java.class.path System property.static intexecCommand(String... command) static voidextractZipFile(ZipFile zipFile, File toDir, String prefix) Extracts the given file to the given directory.static voidforceKillProcess(String pid) static ClientBlobStorestatic intgetComponentParallelism(Map<String, Object> topoConf, Object component) getComponentParallelism(Map<String, Object> topoConf, StormTopology topology) static longgetDiskUsage(File dir) Takes an input dir or file and returns the disk usage on that local directory.static doublegetEstimatedTotalHeapMemoryRequiredByTopo(Map<String, Object> topoConf, StormTopology topology) static intgetEstimatedWorkerCountForRasTopo(Map<String, Object> topoConf, StormTopology topology) static StringgetFileOwner(String path) static longGet system free memory in megabytes.static BlobStoregetNimbusBlobStore(Map<String, Object> conf, String baseDir, NimbusInfo nimbusInfo, ILeaderElector leaderElector) static BlobStoregetNimbusBlobStore(Map<String, Object> conf, NimbusInfo nimbusInfo, ILeaderElector leaderElector) static intgetPathOwnerUid(String fpath) Get the userId of the onwer of the path by running "ls -dn path" command.static URLReturns the current thread classloader.static intGet the userId for a user name.static <T> List<T>interleaveAll(List<List<T>> nodeList) static booleanisAbsolutePath(String path) static booleanisAnyPosixProcessPidDirAlive(Collection<Long> pids, String user) Find if the process is alive using the existence of /proc/<pid> directory owned by the supplied user.static booleanisAnyPosixProcessPidDirAlive(Collection<Long> pids, String expectedUser, boolean mockFileOwnerToUid) Find if the process is alive using the existence of /proc/<pid> directory owned by the supplied expectedUser.static booleanisAnyProcessAlive(Collection<Long> pids, int uid) Are any of the processes alive and running for the specified userId.static booleanisAnyProcessAlive(Collection<Long> pids, String user) Are any of the processes alive and running for the specified user.static booleanisProcessAlive(long pid, String user) Is a process alive and running?.static booleanCheck if the scheduler is resource aware or not.static voidstatic longnimbusVersionOfBlob(String key, ClientBlobStore cb) static SubjectprincipalNameToSubject(String name) static StringscriptFilePath(String dir) static voidsendSignalToProcess(long lpid, int signum) static ServerUtilsProvide an instance of this class for delegates to use.static StringReturns the combined string, escaped for posix shell.static voidUnpack matching files from a jar.static voidstatic voidGiven a Tar File as input it will untar the file in a the untar directory passed as the second parameterstatic voidGiven a File input it will unzip the file in a the unzip directory passed as the second parameter.static voidvalidateTopologyAckerBundleResource(Map<String, Object> topoConf, StormTopology topology, String topoName) RAS scheduler will try to distribute ackers evenly over workers by adding some ackers to each newly launched worker.static voidvalidateTopologyWorkerMaxHeapSizeConfigs(Map<String, Object> stormConf, StormTopology topology, double defaultWorkerMaxHeapSizeMb) static StringWrites a posix shell script file to be executed in its own process.static StringWrites a posix shell script file to be executed in its own process.static booleanzipDoesContainDir(String zipfile, String target) Determines if a zip archive contains a particular directory.static longzipFileSize(File myFile) Given a zip File input it will return its size Only works for zip files whose uncompressed size is less than 4 GB, otherwise returns the size module 2^32, per gzip specifications.
- 
Field Details- 
LOGpublic static final org.slf4j.Logger LOG
- 
IS_ON_WINDOWSpublic static final boolean IS_ON_WINDOWS
- 
SIGKILLpublic static final int SIGKILL- See Also:
 
- 
SIGTERMpublic static final int SIGTERM- See Also:
 
 
- 
- 
Constructor Details- 
ServerUtilspublic ServerUtils()
 
- 
- 
Method Details- 
setInstanceProvide an instance of this class for delegates to use. To mock out delegated methods, provide an instance of a subclass that overrides the implementation of the delegated method.- Parameters:
- u- a ServerUtils instance
- Returns:
- the previously set instance
 
- 
interleaveAll
- 
getNimbusBlobStorepublic static BlobStore getNimbusBlobStore(Map<String, Object> conf, NimbusInfo nimbusInfo, ILeaderElector leaderElector) 
- 
getNimbusBlobStorepublic static BlobStore getNimbusBlobStore(Map<String, Object> conf, String baseDir, NimbusInfo nimbusInfo, ILeaderElector leaderElector) 
- 
isAbsolutePath
- 
shellCmdReturns the combined string, escaped for posix shell.- Parameters:
- command- the list of strings to be combined
- Returns:
- the resulting command string
 
- 
getDiskUsageTakes an input dir or file and returns the disk usage on that local directory. Very basic implementation.- Parameters:
- dir- The input dir to get the disk space of this local dir
- Returns:
- The total disk space of the input local directory
 
- 
getClientBlobStoreForSupervisor
- 
currentClasspathReturns the value of java.class.path System property. Kept separate for testing.- Returns:
- the classpath
 
- 
getResourceFromClassloaderReturns the current thread classloader.
- 
zipDoesContainDirDetermines if a zip archive contains a particular directory.- Parameters:
- zipfile- path to the zipped file
- target- directory being looked for in the zip.
- Returns:
- boolean whether or not the directory exists in the zip.
- Throws:
- IOException
 
- 
getFileOwner- Throws:
- IOException
 
- 
containerFilePath
- 
scriptFilePath
- 
writeScriptpublic static String writeScript(String dir, List<String> command, Map<String, String> environment) throws IOExceptionWrites a posix shell script file to be executed in its own process.- Parameters:
- dir- the directory under which the script is to be written
- command- the command the script is to execute
- environment- optional environment variables to set before running the script's command. May be null.
- Returns:
- the path to the script that has been written
- Throws:
- IOException
 
- 
writeScriptpublic static String writeScript(String dir, List<String> command, Map<String, String> environment, String umask) throws IOExceptionWrites a posix shell script file to be executed in its own process.- Parameters:
- dir- the directory under which the script is to be written
- command- the command the script is to execute
- environment- optional environment variables to set before running the script's command. May be null.
- umask- umask to be set. It can be null.
- Returns:
- the path to the script that has been written
- Throws:
- IOException
 
- 
execCommandpublic static int execCommand(String... command) throws org.apache.commons.exec.ExecuteException, IOException - Throws:
- org.apache.commons.exec.ExecuteException
- IOException
 
- 
sendSignalToProcess- Throws:
- IOException
 
- 
killProcessWithSigTerm- Throws:
- IOException
 
- 
forceKillProcess- Throws:
- IOException
 
- 
nimbusVersionOfBlobpublic static long nimbusVersionOfBlob(String key, ClientBlobStore cb) throws AuthorizationException, KeyNotFoundException 
- 
canUserReadBlob
- 
unJarUnpack matching files from a jar. Entries inside the jar that do not match the given pattern will be skipped.- Parameters:
- jarFile- the .jar file to unpack
- toDir- the destination directory into which to unpack the jar
- Throws:
- IOException
 
- 
unTarGiven a Tar File as input it will untar the file in a the untar directory passed as the second parameter This utility will untar ".tar" files and ".tar.gz","tgz" files.- Parameters:
- inFile- The tar file as input
- untarDir- The untar directory where to untar the tar file
- symlinksDisabled- true if symlinks should be disabled, else false
- Throws:
- IOException
 
- 
unpack- Throws:
- IOException
 
- 
extractZipFileExtracts the given file to the given directory. Only zip entries starting with the given prefix are extracted. The prefix is stripped off entry names before extraction.- Parameters:
- zipFile- The zip file to extract
- toDir- The directory to extract to
- prefix- The prefix to look for in the zip file. If not null only paths starting with the prefix will be extracted
- Throws:
- IOException
 
- 
unZipGiven a File input it will unzip the file in a the unzip directory passed as the second parameter.- Parameters:
- inFile- The zip file as input
- toDir- The unzip directory where to unzip the zip file
- Throws:
- IOException
 
- 
zipFileSizeGiven a zip File input it will return its size Only works for zip files whose uncompressed size is less than 4 GB, otherwise returns the size module 2^32, per gzip specifications.- Parameters:
- myFile- The zip file as input
- Returns:
- zip file size as a long
- Throws:
- IOException
 
- 
isRasCheck if the scheduler is resource aware or not.- Parameters:
- conf- The configuration
- Returns:
- True if it's resource aware; false otherwise
 
- 
getEstimatedWorkerCountForRasTopopublic static int getEstimatedWorkerCountForRasTopo(Map<String, Object> topoConf, StormTopology topology) throws InvalidTopologyException- Throws:
- InvalidTopologyException
 
- 
getEstimatedTotalHeapMemoryRequiredByTopopublic static double getEstimatedTotalHeapMemoryRequiredByTopo(Map<String, Object> topoConf, StormTopology topology) throws InvalidTopologyException- Throws:
- InvalidTopologyException
 
- 
getComponentParallelismpublic static Map<String,Integer> getComponentParallelism(Map<String, Object> topoConf, StormTopology topology) throws InvalidTopologyException- Throws:
- InvalidTopologyException
 
- 
getComponentParallelismpublic static int getComponentParallelism(Map<String, Object> topoConf, Object component) throws InvalidTopologyException- Throws:
- InvalidTopologyException
 
- 
principalNameToSubject
- 
currentClasspathImpl
- 
getResourceFromClassloaderImpl
- 
getMemInfoFreeMbGet system free memory in megabytes.- Returns:
- system free memory in megabytes
- Throws:
- IOException- on I/O exception
 
- 
isProcessAliveIs a process alive and running?.- Parameters:
- pid- the PID of the running process
- user- the user that is expected to own that process
- Returns:
- true if it is, else false
- Throws:
- IOException- on any error
 
- 
isAnyProcessAliveAre any of the processes alive and running for the specified user. If collection is empty or null then the return value is trivially false.- Parameters:
- pids- the PIDs of the running processes
- user- the user that is expected to own that process
- Returns:
- true if any one of the processes is owned by user and alive, else false
- Throws:
- IOException- on I/O exception
 
- 
isAnyProcessAliveAre any of the processes alive and running for the specified userId. If collection is empty or null then the return value is trivially false.- Parameters:
- pids- the PIDs of the running processes
- uid- the user that is expected to own that process
- Returns:
- true if any one of the processes is owned by user and alive, else false
- Throws:
- IOException- on I/O exception
 
- 
getUserIdGet the userId for a user name. This works on Posix systems by using "id -u" command. Throw IllegalArgumentException on Windows.- Parameters:
- user- username to be converted to UID. This is optional, in which case current user is returned.
- Returns:
- UID for the specified user (if supplied), else UID of current user, -1 upon Exception.
 
- 
getPathOwnerUidGet the userId of the onwer of the path by running "ls -dn path" command. This command works on Posix systems only.- Parameters:
- fpath- full path to the file or directory.
- Returns:
- UID for the specified if successful, -1 upon failure.
 
- 
areAllProcessesDeadpublic static boolean areAllProcessesDead(Map<String, Object> conf, String user, String workerId, Set<Long> pids) throws IOExceptionFind if all processes for the user on workId are dead. This method attempts to optimize the calls by:
- checking a collection of ProcessIds at once
- using userId one Posix systems instead of user
- Returns:
- true if all processes for the user are dead on the worker
- Throws:
- IOException- if external commands have exception.
 
- 
isAnyPosixProcessPidDirAlivepublic static boolean isAnyPosixProcessPidDirAlive(Collection<Long> pids, String user) throws IOException Find if the process is alive using the existence of /proc/<pid> directory owned by the supplied user. This is an alternative to "ps -p pid -u uid" command used inisAnyPosixProcessAlive(Collection, int)Processes are tracked using the existence of the directory "/proc/<pid> For each of the supplied PIDs, their PID directory is checked for existence and ownership by the specified uid. - Parameters:
- pids- Process IDs that need to be monitored for liveness
- user- the userId that is expected to own that process
- Returns:
- true if any one of the processes is owned by user and alive, else false
- Throws:
- IOException- on I/O exception
 
- 
isAnyPosixProcessPidDirAlivepublic static boolean isAnyPosixProcessPidDirAlive(Collection<Long> pids, String expectedUser, boolean mockFileOwnerToUid) throws IOException Find if the process is alive using the existence of /proc/<pid> directory owned by the supplied expectedUser. This is an alternative to "ps -p pid -u uid" command used inisAnyPosixProcessAlive(Collection, int)Processes are tracked using the existence of the directory "/proc/<pid> For each of the supplied PIDs, their PID directory is checked for existence and ownership by the specified uid. - Parameters:
- pids- Process IDs that need to be monitored for liveness
- expectedUser- the userId that is expected to own that process
- mockFileOwnerToUid- if true (used for testing), then convert File.owner to UID
- Returns:
- true if any one of the processes is owned by expectedUser and alive, else false
- Throws:
- IOException- on I/O exception
 
- 
validateTopologyWorkerMaxHeapSizeConfigspublic static void validateTopologyWorkerMaxHeapSizeConfigs(Map<String, Object> stormConf, StormTopology topology, double defaultWorkerMaxHeapSizeMb) throws InvalidTopologyException- Throws:
- InvalidTopologyException
 
- 
validateTopologyAckerBundleResourcepublic static void validateTopologyAckerBundleResource(Map<String, Object> topoConf, StormTopology topology, String topoName) throws InvalidTopologyExceptionRAS scheduler will try to distribute ackers evenly over workers by adding some ackers to each newly launched worker. Validations are performed here: (Config.TOPOLOGY_RAS_ACKER_EXECUTORS_PER_WORKER* memory for an acker + memory for the biggest topo executor) < max worker heap memory. When RAS tries to schedule an executor to a new worker, it will putConfig.TOPOLOGY_RAS_ACKER_EXECUTORS_PER_WORKERackers into the worker first. SoConfig.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MBneed to be able to accommodate this.- Parameters:
- topoConf- Topology conf
- topology- Topology (not system topology)
- topoName- The name of the topology
- Throws:
- InvalidTopologyException
 
 
-