# fileutils - helper methods related to the file system
#
# Copyright (c) 2013 - 2019 Software AG, Darmstadt, Germany and/or its licensors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# $Id: fileutils.py 301527 2017-02-06 15:31:43Z matj $
#
"""
Functions for manipulating files and paths including `xpybuild.utils.fileutils.openForWrite`,
`xpybuild.utils.fileutils.mkdir`, `xpybuild.utils.fileutils.toLongPathSafe` and `xpybuild.utils.fileutils.parsePropertiesFile`.
"""
import shutil, os, os.path, time, platform, threading
import stat, sys
import io
from xpybuild.utils.flatten import getStringList
import subprocess, errno
import logging
log = logging.getLogger('fileutils')
__isWindows = platform.system()=='Windows'
if __isWindows: # Workaround required for windows filesystem semantics having a stupid race condition between writes from POSIX API (which Python uses) and win32 API (e.g. used by Java/C++)
try:
import win32file
class _Win32FileWriter(io.RawIOBase):
def __init__(self, dest, mode='w', encoding=None, errors=None, newline=None):
super(_Win32FileWriter, self).__init__()
assert 'w' in mode, 'Currently the Win32FileWriter class only supports writing, not reading'
self.dest = dest
self.__textWrapper = None if 'b' in mode else io.TextIOWrapper(self, encoding=encoding, errors=errors, newline=newline)
self.__alreadyclosed = False
def __enter__(self):
self.Fd = win32file.CreateFile(self.dest, win32file.GENERIC_WRITE,
win32file.FILE_SHARE_READ | win32file.FILE_SHARE_WRITE | win32file.FILE_SHARE_DELETE,
None, win32file.CREATE_ALWAYS, win32file.FILE_ATTRIBUTE_NORMAL, None)
if self.__textWrapper is not None: return self.__textWrapper
return self
def writable(self): return True
def write(self, data):
# writes bytes to the file using the Win32 (not POSIX api)
err, byteswritten = win32file.WriteFile(self.Fd, data)
return byteswritten
def close(self):
if self.__alreadyclosed: return # make this idempotent (not least to avoid infinite loop when the text wrapper tries to close us)
self.__alreadyclosed = True
if self.__textWrapper is not None: self.__textWrapper.close()
win32file.CloseHandle(self.Fd)
def __exit__(self, ex_type, ex_val, tb):
self.close()
except Exception:
raise # need to know about this
openForWrite = _Win32FileWriter if __isWindows else open
"""
Open a file for writing and return a corresponding text or binary stream file object.
This has the same semantics as open/io.open, but should be used instead of open/io.open
to avoid file system race conditions on Windows. This class must be used from a
`with` clause.
"""
[docs]def mkdir(newdir):
""" Recursively create the specified directory if it doesn't already exist.
If it does, exit without error.
@param newdir: The path to create.
@return: newdir, to allow fluent use of this method.
"""
origdir = newdir
newdir=normLongPath(newdir)
if os.path.isdir(newdir): # already exists
return origdir
if os.path.isfile(newdir):
raise IOError("A file with the same name as the desired dir, '%s', already exists" % newdir)
#when multiple threads/processes are creating directories
#at the same time, it can be a race
try:
os.makedirs(newdir, exist_ok=True)
except Exception as e:
if os.path.isdir(newdir): # probably won't happen now we've added exist_ok
pass
else:
raise IOError('Problem creating directory %s: %s' % (newdir, e))
return origdir
[docs]def deleteDir(path, allowRetry=True):
""" Recursively delete the contents of a directory.
Contains magic hacks so it works even on paths that exceed the Windows MAX_PATH 260 character length.
@param path: the path to delete.
@param allowRetry: set to False to disable automatic retry of the deletion after a few seconds (in case the error was
transient)
"""
def handleRemoveReadonly(func, path, exc):
# once we've got this working reliably, might reduce the level of some of these log statements
excvalue = exc[1]
log.info("handleRemoveReadonly: error removing path %s (%s %s), will try harder; exists=%s" % (path,errno.errorcode.get(excvalue.errno, "EUNKNOWN"), func, os.path.exists(path)))
if func in (os.rmdir, os.remove):
if not os.path.exists(path): # no idea why this happens, but on windows it does
log.info("handleRemoveReadonly: suppressing spurious remove exception for already-deleted path: %s", path)
return
if excvalue.errno == errno.EACCES: # access denied, make it writable first
try:
os.chmod(path, stat.S_IRWXU| stat.S_IRWXG| stat.S_IRWXO) # 0777
func(path)
log.info("handleRemoveReadonly: fixed by chmod: %s", path)
return
except Exception:
log.exception('handleRemoveReadonly error while trying to handle EACCES: ')
if not os.path.exists(path):
log.info('handleRemoveReadonly gone now') # surely this never happens? if it does, change the code below
raise
elif excvalue.errno == errno.ENOTEMPTY: # directory not empty, try again
try:
log.info("handleRemoveReadonly: ENOTEMPTY dir - has contents: %s", os.listdir(path))
except Exception as e:
log.info("handleRemoveReadonly: ENOTEMPTY dir, could not get contents: %s"%e)
if allowRetry: # avoid danger of infinite recursion if things are going really wrong
deleteDir(path, allowRetry=False)
log.info("handleRemoveReadonly: fixed by retrying rmtree for ENOTEMPTY: %s", path)
return
elif excvalue.errno == errno.ENOENT: # maybe windows went mad and deleted it anyway
log.error("handleRemoveReadonly: ENOTENT error was raised by path that still exists: %s"%path)
raise
# if we didn't manage to handle this, rethrow
log.warning("handleRemoveReadonly: still failed to remove path %s (%s %s); exists=%s" % (path,errno.errorcode.get(excvalue.errno, "EUNKNOWN"), func, os.path.exists(path)))
raise
path = normLongPath(path)
if not os.path.exists(path):
return
try:
shutil.rmtree(path, ignore_errors=False, onerror=handleRemoveReadonly)
except OSError as e:
if os.path.isfile(path):
raise OSError("Unable to delete dir %s as this is a file not a directory" % (path))
if allowRetry:
log.warn("Failed to delete dir %s (%s), will retry in 10 seconds" %(path, e))
# todo: remove these debug comments in time
#handleslog = os.path.normpath('openhandles_%s.txt'%os.path.basename(path))
#with open(handleslog, 'w') as f:
# handlecmd = [os.path.normpath('c:/dev/apama-lib2/win/all/sysinternals/handle.exe'), '-u', 'c:\\dev\\5.2.0.x\\apama-src', '/accepteula']
# #print 'running: ', ' '.join(handlecmd)
# subprocess.call(args=handlecmd, stdout=f)
# maybe it was a transient error, so try again a little later
time.sleep(10.0)
# on windows, try again using a separate process, just in case that
# helps to avoid problems with virus checkers, etc
if __isWindows:
rmdirresult = os.system('rmdir /s /q "%s" 2>1 > /dev/nul'%path)
log.info("Directory deletion retry using rmdir returned code %d: %s", rmdirresult, path)
# continue to run deleteDir regardless of result, to check it's
# really gone, and to give better error messages if we still
# can't delete for any reason
deleteDir(path, allowRetry=False)
log.info("Deleted successfully on retry: %s", path)
else:
if os.path.exists(path):
# maybe logging this is overkill, consider removing in future
log.info("Unable to delete dir %s - original exception is: " % (path), exc_info=sys.exc_info())
raise OSError("Unable to delete dir %s: %s" % (path, e))
[docs]def deleteFile(path, allowRetry=True):
"""Delete the specified file, with the option of automatically retrying a few times if the first attempt fails
(to get around Windows weirdness), throwing an exception if the file still exists at the end of retrying.
Use this instead of os.remove for improved robustness.
Does nothing if the file doesn't already exist.
Contains magic hacks so it works even on paths that exceed the Windows MAX_PATH 260 character length.
@param path: The path to delete.
@param allowRetry: If true, wait for a bit and retry the removal if it fails (default: true)
"""
path = normLongPath(path)
try:
if not os.path.lexists(path): return # use lexists in case we're deleting a symlink
try:
os.remove(path)
except Exception:
if os.path.lexists(path):
raise
except OSError as e:
if os.path.isdir(path):
raise OSError("Unable to delete file %s as this is a directory not a file" % (path))
if allowRetry:
log.debug("Failed to delete file %s on first attempt (%s), will retry in 5 seconds", path, e)
# maybe it was a transient error, so try again a little later
# on contended windows machines a 5 second sleep isn't always sufficient to prevent error 32
time.sleep(10.0)
deleteFile(path, allowRetry=False)
log.debug("Deleted file successfully on retry: %s", path)
else:
if os.path.lexists(path):
if os.path.basename(path) in ('%s'%e):
raise
else:
raise OSError("Unable to delete file %s: %s" % (path, e))
[docs]def parsePropertiesFile(lines, excludeLines=None):
"""
Parse the contents of the specified properties file or line list, and return an ordered list
of (key,value,lineno) pairs.
If desired, convert this to a dict using::
{k:v for (k,v,lineno) in parsePropertiesFile(...)}
@param lines: an open file handle or a sequence that can be iterated over to get each line in the file.
@param excludeLines: a string of list of strings to search for, any KEY containing these strings will be ignored
>>> parsePropertiesFile(['a','b=c',' z = x', 'a=d #foo', '#g=h'])
[('b', 'c', 2), ('z', 'x', 3), ('a', 'd', 4)]
>>> parsePropertiesFile(['a=b','c=d#foo','XfooX=e', 'f=h'], excludeLines='foo')
[('a', 'b', 1), ('c', 'd', 2), ('f', 'h', 4)]
>>> parsePropertiesFile(['a=b','c=d#foo','XfooX=e', 'f=h'], excludeLines=['foo','h'])
[('a', 'b', 1), ('c', 'd', 2), ('f', 'h', 4)]
"""
excludeLines = getStringList(excludeLines)
result = []
lineNo = 0
for line in lines:
lineNo += 1
if '#' in line:
line = line[:line.find('#')].strip()
line = line.strip()
if not line or line.startswith('#') or line.startswith('//') or not '=' in line:
continue
key = line[:line.find('=')].strip()
value = line[line.find('=')+1:].strip()
if [x for x in excludeLines if x in key]:
log.debug('Ignoring property line due to exclusion: %s', line)
continue
# NB: we don't have a full implementation of .properties escaping yet (e.g. \n but not \\n etc)
# but this is all we need for now
value = value.replace('\\\\','\\')
result.append((key,value, lineNo))
return result
if os.sep == '\\':
def isDirPath(path):
""" Returns true if the path is a directory (ends with / or \\).
>>> isDirPath(None)
False
>>> isDirPath('/')
True
>>> isDirPath('a/')
True
>>> isDirPath('a'+os.sep)
True
"""
try:
return path[-1] in {'/', '\\'}
except Exception:
return False
else:
[docs] def isDirPath(path):
""" Returns true if the path is a directory (ends with / or \\).
>>> isDirPath(None)
False
>>> isDirPath('/')
True
>>> isDirPath('a/')
True
>>> isDirPath('a'+os.sep)
True
"""
try:
return path[-1] == '/'
except Exception:
return False
__longPathCache = {} # GIL protects integrity of dict, no need for extra locking as it's only a cache
[docs]def toLongPathSafe(path, force=False):
"""
Converts the specified path string to a form suitable for passing to API
calls if it exceeds the maximum path length on this OS.
Currently, this is necessary only on Windows, where a string
starting with ``\\\\?\\`` must be used to get correct behaviour for long paths.
Unlike L{normLongPath} which also performs the long path conversion, this
function does NOT convert to a canonical form, normalize slashes or
remove '..' elements (unless required for long path support). It is therefore
faster.
@param path: A path. Must not be a relative path. Can be None/empty. Can
contain ".." sequences, though performance is a lot lower if it does.
@param force: Normally the long path support is added only if this path
exceeds the maximum length on this OS (e.g. 256 chars) or ends with a
directory slash. Set force to True to add long path support regardless of
length, which allows extra characters to be added on to the end of the
string (e.g. ".log" or a directory filename) safely.
@return: The passed-in path, possibly with a ``\\\\?\\`` prefix added and
forward slashes converted to backslashes on Windows. Any trailing slash
is preserved by this function (though will be converted to a backslash).
"""
if (not __isWindows) or (not path): return path
if (force or len(path)>255 or isDirPath(path)) and not path.startswith('\\\\?\\'):
if path in __longPathCache: return __longPathCache[path]
inputpath = path
# ".." is not permitted in \\?\ paths; normpath is expensive so don't do this unless we have to
if '.' in path:
path = os.path.normpath(path)+('\\' if isDirPath(path) else '')
else:
# path is most likely to contain / so more efficient to conditionalize this
path = path.replace('/','\\')
if '\\\\' in path:
# consecutive \ separators are not permitted in \\?\ paths
path = path.replace('\\\\','\\')
if path.startswith('\\\\'):
path = '\\\\?\\UNC\\'+path.lstrip('\\') # \\?\UNC\server\share Oh My
else:
path = '\\\\?\\'+path
__longPathCache[inputpath] = path
return path
__normLongPathCache = {} # GIL protects integrity of dict, no need for extra locking as it's only a cache
[docs]def normPath(path):
"""
Normalizes but does NOT absolutize a path (os.path.normpath). This converts an absolute or relative path to a
canonical form (e.g. normalizing the case of the drive letter on Windows), but unlike `normLongPath` does not
add the ``\\\\?\\`` prefix needed to permit long paths or absolutize.
@param path: the absolute path to be converted should be a unicode string where possible, as specifying a byte
string will not work if the path contains non-ascii characters.
"""
if path is None: return None
# NB: abspath also normalizes slashes
hadslash = isDirPath(path)
path = os.path.normpath(path)
# annoyingly we have to do this check since abspath strips off slashes in most cases but not always (e.g. not if given a \\?\ path)
if hadslash and not path.endswith(os.path.sep): path += os.path.sep
# normpath does nothing to normalize case, and windows seems to be quite random about upper/lower case
# for drive letters (more so than directory names), with different cmd prompts frequently using different
# capitalization, so normalize at least that bit, to prevent spurious rebuilding from different prompts
if __isWindows and os.path.isabs(path) and len(path)>2 and path[1] == ':' and path[0] >= 'A' and path[0] <= 'Z':
path = path[0].lower()+path[1:]
return path
[docs]def normLongPath(path):
"""
Normalizes and absolutizes a path (os.path.abspath), converts to a canonical
form (e.g. normalizing the case of the drive letter on Windows), and on
windows adds the ``\\\\?\\`` prefix needed to force correct handling of long
(>256 chars) paths (same as L{toLongPathSafe}).
@param path: the absolute path to be converted should be a unicode string where possible, as specifying a byte
string will not work if the path contains non-ascii characters.
"""
if path is None: return path
# profiling shows normLongPath is surprisingly costly; caching results reduces dep checking by 2-3x
if path in __normLongPathCache: return __normLongPathCache[path]
inputpath = path
# currently there is some duplication between this and buildcommon.normpath which we ought to fix at some point
# normpath does nothing to normalize case, and windows seems to be quite random about upper/lower case
# for drive letters (more so than directory names), with different cmd prompts frequently using different
# capitalization, so normalize at least that bit, to prevent spurious rebuilding from different prompts
iswindows = __isWindows
if iswindows and len(path)>2 and path[1] == ':' and path[0] >= 'A' and path[0] <= 'Z':
path = path[0].lower()+path[1:]
if iswindows and path.startswith('\\\\?\\'):
path = path.replace('/', '\\')
else:
# abspath also normalizes slashes
path = os.path.abspath(path)+(os.path.sep if isDirPath(path) else '')
if iswindows and not path.startswith('\\\\?\\'):
if path.startswith('\\\\'):
path = '\\\\?\\UNC\\'+path.lstrip('\\') # \\?\UNC\server\share Oh My
else:
path = '\\\\?\\'+path
__normLongPathCache[inputpath] = path
return path
__statcache = {}
__statcache_get = __statcache.get
[docs]def cached_stat(path, errorIfMissing=False):
""" Cached-once os.stat (DO NOT USE if you expect it to change after startup).
Returns False if missing. """
st = __statcache_get(path, None)
if st is None:
try:
st = os.stat(path)
except os.error: # mean file doesn't exist
st = False
__statcache[path] = st
if st is False and errorIfMissing:
raise Exception('Cannot find path "%s"'%path)
return st
[docs]def cached_getmtime(path):
""" Cached-once os.getmtime (DO NOT USE if you expect it to change after startup) """
return cached_stat(path, errorIfMissing=True).st_mtime
[docs]def cached_getsize(path):
""" Cached-once os.path.getsize (DO NOT USE if you expect it to change after startup) """
return cached_stat(path, errorIfMissing=True).st_size
[docs]def cached_exists(path):
""" Cached-once os.path.exists (DO NOT USE if you expect it to change after startup) """
return cached_stat(path) is not False
[docs]def cached_isfile(path):
""" Cached-once os.path.isfile (DO NOT USE if you expect it to change after startup) """
st = cached_stat(path)
return (st is not False) and stat.S_ISREG(st.st_mode)
[docs]def cached_isdir(path):
""" Cached-once os.path.isdir (DO NOT USE if you expect it to change after startup) """
st = cached_stat(path)
return (st is not False) and stat.S_ISDIR(st.st_mode)
# for compatibility with pre-3.0
getstat = cached_stat
""" .. private:: Use cached_ function instead. """
getmtime = cached_getmtime
""" .. private:: Use cached_ function instead. """
getsize = cached_getsize
""" .. private:: Use cached_ function instead. """
exists = cached_exists
""" .. private:: Use cached_ function instead. """
isfile = cached_isfile
""" .. private:: Use cached_ function instead. """
isdir = cached_isdir
""" .. private:: Use cached_ function instead. """
def _getStatCacheSize():
"""
Internal diagnostic method for getting the number of entries we've stat'ed so far.
"""
return len(__statcache)
def resetStatCache():
""" Resets cached stat data.
.. private:: For internal use only.
"""
__statcache.clear()