-@contextmanager
-def atomically_replaced_file(name, mode='w', buffering=-1):
- """Yield a file that will be atomically renamed name when leaving the block.
-
- This contextmanager yields an open file object that is backed by a
- temporary file which will be renamed (atomically) to the target
- name if everything succeeds.
-
- The mode and buffering arguments are handled exactly as with open,
- and the yielded file will have very restrictive permissions, as
- per mkstemp.
-
- E.g.::
-
- with atomically_replaced_file('foo.txt', 'w') as f:
- f.write('hello jack.')
-
- """
-
- (ffd, tempname) = tempfile.mkstemp(dir=os.path.dirname(name),
- text=('b' not in mode))
- try:
- try:
- f = os.fdopen(ffd, mode, buffering)
- except:
- os.close(ffd)
- raise
- try:
- yield f
- finally:
- f.close()
- os.rename(tempname, name)
- finally:
- unlink(tempname) # nonexistant file is ignored
+class atomically_replaced_file:
+ def __init__(self, path, mode='w', buffering=-1):
+ """Return a context manager supporting the atomic replacement of a file.
+
+ The context manager yields an open file object that has been
+ created in a mkdtemp-style temporary directory in the same
+ directory as the path. The temporary file will be renamed to
+ the target path (atomically if the platform allows it) if
+ there are no exceptions, and the temporary directory will
+ always be removed. Calling cancel() will prevent the
+ replacement.
+
+ The file object will have a name attribute containing the
+ file's path, and the mode and buffering arguments will be
+ handled exactly as with open(). The resulting permissions
+ will also match those produced by open().
+
+ E.g.::
+
+ with atomically_replaced_file('foo.txt', 'w') as f:
+ f.write('hello jack.')
+
+ """
+ assert 'w' in mode
+ self.path = path
+ self.mode = mode
+ self.buffering = buffering
+ self.canceled = False
+ self.tmp_path = None
+ self.cleanup = ExitStack()
+ def __enter__(self):
+ with self.cleanup:
+ parent, name = os.path.split(self.path)
+ tmpdir = self.cleanup.enter_context(temp_dir(dir=parent,
+ prefix=name + b'-'))
+ self.tmp_path = tmpdir + b'/pending'
+ f = open(self.tmp_path, mode=self.mode, buffering=self.buffering)
+ f = self.cleanup.enter_context(f)
+ self.cleanup = self.cleanup.pop_all()
+ return f
+ def __exit__(self, exc_type, exc_value, traceback):
+ with self.cleanup:
+ if not (self.canceled or exc_type):
+ os.rename(self.tmp_path, self.path)
+ def cancel(self):
+ self.canceled = True