Do project log writing synchronously (in writer process).

model-merge
Ben Niemann 4 years ago
parent e3140b78fd
commit 879149fb41
  1. 269
      noisicaa/core/storage.py
  2. 12
      noisicaa/core/storage_test.py

@ -28,8 +28,6 @@ import os
import os.path
import time
import struct
import queue
import threading
from typing import cast, Dict, List, Set, Tuple, IO
from mypy_extensions import TypedDict
@ -92,6 +90,7 @@ class ProjectStorage(object):
self.next_log_number = None # type: int
self.log_file_number = 0
self.log_fp = None # type: IO[bytes]
self.log_fp_map = {} # type: Dict[int, IO]
self.log_index_formatter = struct.Struct('>QQ')
self.log_index = None # type: bytearray
@ -106,12 +105,9 @@ class ProjectStorage(object):
self.checkpoint_index_formatter = struct.Struct('>QQ')
self.checkpoint_index = None # type: bytearray
self.cache_lock = threading.RLock()
self.log_entry_cache = collections.OrderedDict() # type: collections.OrderedDict[int, LogEntry]
self.log_entry_cache_size = 20
self.write_queue = queue.Queue() # type: queue.Queue
self.writer_thread = threading.Thread(target=self._writer_main)
self.written_log_number = None # type: int
self.written_sequence_number = None # type: int
@ -142,6 +138,13 @@ class ProjectStorage(object):
self.file_lock = self.acquire_file_lock(
os.path.join(self.data_dir, "lock"))
log_path = os.path.join(self.data_dir, 'log.%06d' % self.log_file_number)
if os.path.exists(log_path):
mode = 'a+b'
else:
mode = 'w+b'
self.log_fp = open(log_path, mode=mode, buffering=0)
self.log_index_fp = open(
os.path.join(self.data_dir, 'log.index'),
mode='r+b', buffering=0)
@ -177,8 +180,6 @@ class ProjectStorage(object):
self.next_checkpoint_number * self.checkpoint_index_formatter.size):
raise CorruptedProjectError("Malformed checkpoint.index file.")
self.writer_thread.start()
def get_restore_info(self) -> Tuple[int, List[Tuple[Action, int]]]:
assert self.next_checkpoint_number > 0
@ -224,11 +225,6 @@ class ProjectStorage(object):
assert self.path is not None, "Project already closed."
self.path = None
if self.writer_thread is not None:
self.write_queue.put(('STOP', None))
self.writer_thread.join()
self.writer_thread = None
if self.log_index_fp is not None:
self.log_index_fp.close()
self.log_index_fp = None
@ -237,6 +233,10 @@ class ProjectStorage(object):
self.log_history_fp.close()
self.log_history_fp = None
if self.log_fp is not None:
self.log_fp.close()
self.log_fp = None
for log_fp in self.log_fp_map.values():
log_fp.close()
self.log_fp_map.clear()
@ -261,17 +261,59 @@ class ProjectStorage(object):
logger.info("Releasing file lock.")
lock_fp.close()
def _schedule_log_write(
def _write_log(
self, seq_number: int, history_entry: HistoryEntry, log_entry: LogEntry) -> None:
assert self.writer_thread.is_alive()
self.write_queue.put(
('LOG', (seq_number, history_entry, log_entry)))
log_number = history_entry[1]
logger.info("Writing log entry #%d...", seq_number)
if log_entry is not None:
offset = self.log_fp.tell()
def _schedule_checkpoint_write(
self.log_fp.write(struct.pack('>Q', len(log_entry)))
self.log_fp.write(log_entry)
self.log_fp.flush()
packed_index_entry = self.log_index_formatter.pack(
self.log_file_number, offset)
self.log_index_fp.write(packed_index_entry)
self.log_index_fp.flush()
packed_history_entry = self.log_history_formatter.pack(
*history_entry)
self.log_history_fp.write(packed_history_entry)
self.log_history_fp.flush()
assert seq_number > self.written_sequence_number
self.written_sequence_number = seq_number
if log_entry is not None:
self.log_index += packed_index_entry
assert log_number > self.written_log_number
self.written_log_number = log_number
def _write_checkpoint(
self, seq_number: int, checkpoint_number: int, checkpoint: Checkpoint) -> None:
assert self.writer_thread.is_alive()
self.write_queue.put(
('CHECKPOINT', (seq_number, checkpoint_number, checkpoint)))
checkpoint_path = os.path.join(
self.data_dir,
'checkpoint.%06d' % checkpoint_number)
logger.info("Writing checkpoint %s...", checkpoint_path)
with open(checkpoint_path, mode='wb', buffering=0) as fp:
header = storage_pb2.FileHeader()
header.type = 'checkpoint'
header.version = self.VERSION
header.create_timestamp = int(time.time())
header.size = len(checkpoint)
header.checksum_type = storage_pb2.FileHeader.MD5
header.checksum = hashlib.md5(checkpoint).digest()
self._write_file_header(fp, header)
fp.write(checkpoint)
packed_index_entry = self.checkpoint_index_formatter.pack(
seq_number, checkpoint_number)
self.checkpoint_index_fp.write(packed_index_entry)
self.checkpoint_index_fp.flush()
def _write_file_header(self, fp: IO[bytes], header: storage_pb2.FileHeader) -> None:
fp.write(self.MAGIC)
@ -297,87 +339,6 @@ class ProjectStorage(object):
return header
def _writer_main(self) -> None:
logger.info("Log writer thread started.")
log_path = os.path.join(
self.data_dir,
'log.%06d' % self.log_file_number)
if os.path.exists(log_path):
mode = 'a+b'
else:
mode = 'w+b'
with open(log_path, mode=mode, buffering=0) as log_fp:
while True:
cmd, arg = self.write_queue.get()
if cmd == 'STOP':
break
elif cmd == 'FLUSH':
arg.set()
elif cmd == 'PAUSE':
arg.wait()
elif cmd == 'LOG':
seq_number, history_entry, log_entry = arg
log_number = history_entry[1]
logger.info("Writing log entry #%d...", seq_number)
if log_entry is not None:
offset = log_fp.tell()
log_fp.write(struct.pack('>Q', len(log_entry)))
log_fp.write(log_entry)
log_fp.flush()
packed_index_entry = self.log_index_formatter.pack(
self.log_file_number, offset)
self.log_index_fp.write(packed_index_entry)
self.log_index_fp.flush()
packed_history_entry = self.log_history_formatter.pack(
*history_entry)
self.log_history_fp.write(packed_history_entry)
self.log_history_fp.flush()
with self.cache_lock:
assert seq_number > self.written_sequence_number
self.written_sequence_number = seq_number
if log_entry is not None:
self.log_index += packed_index_entry
assert log_number > self.written_log_number
self.written_log_number = log_number
elif cmd == 'CHECKPOINT':
seq_number, checkpoint_number, checkpoint = arg
checkpoint_path = os.path.join(
self.data_dir,
'checkpoint.%06d' % checkpoint_number)
logger.info("Writing checkpoint %s...", checkpoint_path)
with open(checkpoint_path, mode='wb', buffering=0) as fp:
header = storage_pb2.FileHeader()
header.type = 'checkpoint'
header.version = self.VERSION
header.create_timestamp = int(time.time())
header.size = len(checkpoint)
header.checksum_type = storage_pb2.FileHeader.MD5
header.checksum = hashlib.md5(checkpoint).digest()
self._write_file_header(fp, header)
fp.write(checkpoint)
packed_index_entry = self.checkpoint_index_formatter.pack(
seq_number, checkpoint_number)
self.checkpoint_index_fp.write(packed_index_entry)
self.checkpoint_index_fp.flush()
else: # pragma: no coverage
raise ValueError("Invalud command %r" % cmd)
logger.info("Log writer thread finished.")
def _get_history_entry(self, seq_number: int) -> HistoryEntry:
size = self.log_history_formatter.size
offset = seq_number * size
@ -401,12 +362,11 @@ class ProjectStorage(object):
return log_fp
def _read_log_entry(self, log_number: int) -> LogEntry:
with self.cache_lock:
size = self.log_index_formatter.size
offset = log_number * size
packed_index_entry = self.log_index[offset:offset+size]
file_number, file_offset = self.log_index_formatter.unpack(
packed_index_entry)
size = self.log_index_formatter.size
offset = log_number * size
packed_index_entry = self.log_index[offset:offset+size]
file_number, file_offset = self.log_index_formatter.unpack(
packed_index_entry)
log_fp = self._get_log_fp(file_number)
log_fp.seek(file_offset, os.SEEK_SET)
@ -425,9 +385,8 @@ class ProjectStorage(object):
return entry
def _add_log_entry(self, log_number: int, entry: LogEntry) -> None:
with self.cache_lock:
self.log_entry_cache[log_number] = entry
self.flush_cache(self.log_entry_cache_size)
self.log_entry_cache[log_number] = entry
self.flush_cache(self.log_entry_cache_size)
def _get_checkpoint_entry(self, checkpoint_number: int) -> CheckpointIndexEntry:
size = self.checkpoint_index_formatter.size
@ -436,51 +395,37 @@ class ProjectStorage(object):
return cast(CheckpointIndexEntry, self.checkpoint_index_formatter.unpack(packed_entry))
def flush_cache(self, cache_size: int) -> None:
with self.cache_lock:
entries_to_drop = len(self.log_entry_cache) - cache_size
if entries_to_drop > 0:
dropped_entries = set() # type: Set[int]
for ln in self.log_entry_cache.keys():
if len(dropped_entries) >= entries_to_drop:
break
if ln > self.written_log_number:
continue
dropped_entries.add(ln)
for ln in dropped_entries:
del self.log_entry_cache[ln]
def flush(self) -> None:
flushed = threading.Event()
self.write_queue.put(('FLUSH', flushed))
flushed.wait()
def pause(self) -> threading.Event:
evt = threading.Event()
self.write_queue.put(('PAUSE', evt))
return evt
entries_to_drop = len(self.log_entry_cache) - cache_size
if entries_to_drop > 0:
dropped_entries = set() # type: Set[int]
for ln in self.log_entry_cache.keys():
if len(dropped_entries) >= entries_to_drop:
break
if ln > self.written_log_number:
continue
dropped_entries.add(ln)
for ln in dropped_entries:
del self.log_entry_cache[ln]
def append_log_entry(self, entry: LogEntry) -> None:
assert self.path is not None, "Project already closed."
with self.cache_lock:
assert self.next_log_number not in self.log_entry_cache
self.undo_count = 0
self.redo_count = 0
assert self.next_log_number not in self.log_entry_cache
history_entry = (
ACTION_FORWARD.value, self.next_log_number,
self.undo_count, self.redo_count)
self.undo_count = 0
self.redo_count = 0
self._add_history_entry(history_entry)
self._add_log_entry(self.next_log_number, entry)
history_entry = (
ACTION_FORWARD.value, self.next_log_number,
self.undo_count, self.redo_count)
self._schedule_log_write(
self.next_sequence_number, history_entry, entry)
self._add_history_entry(history_entry)
self._add_log_entry(self.next_log_number, entry)
self._write_log(self.next_sequence_number, history_entry, entry)
self.next_log_number += 1
self.next_sequence_number += 1
self.next_log_number += 1
self.next_sequence_number += 1
@property
def can_undo(self) -> bool:
@ -511,34 +456,30 @@ class ProjectStorage(object):
assert self.can_undo
entry_to_undo = self.next_sequence_number - 2 * self.undo_count - 1
action, log_number = self._get_history_entry(entry_to_undo)[0:2]
with self.cache_lock:
action, log_number = self._get_history_entry(entry_to_undo)[0:2]
self.undo_count += 1
history_entry = (_reverse_action(action), log_number, self.undo_count, self.redo_count)
self._add_history_entry(history_entry)
self.undo_count += 1
history_entry = (_reverse_action(action), log_number, self.undo_count, self.redo_count)
self._add_history_entry(history_entry)
self._schedule_log_write(self.next_sequence_number, history_entry, None)
self._write_log(self.next_sequence_number, history_entry, None)
self.next_sequence_number += 1
self.next_sequence_number += 1
def redo(self) -> None:
assert self.path is not None, "Project already closed."
assert self.can_redo
entry_to_redo = self.next_sequence_number - 2 * self.undo_count
action, log_number = self._get_history_entry(entry_to_redo)[0:2]
with self.cache_lock:
action, log_number = self._get_history_entry(entry_to_redo)[0:2]
self.redo_count += 1
history_entry = (action, log_number, self.undo_count, self.redo_count)
self._add_history_entry(history_entry)
self.redo_count += 1
history_entry = (action, log_number, self.undo_count, self.redo_count)
self._add_history_entry(history_entry)
self._schedule_log_write(self.next_sequence_number, history_entry, None)
self._write_log(self.next_sequence_number, history_entry, None)
self.next_sequence_number += 1
self.next_sequence_number += 1
@property
def logs_since_last_checkpoint(self) -> int:
@ -548,7 +489,7 @@ class ProjectStorage(object):
return self.next_sequence_number - seq_number
def add_checkpoint(self, checkpoint: Checkpoint) -> None:
self._schedule_checkpoint_write(
self._write_checkpoint(
self.next_sequence_number, self.next_checkpoint_number, checkpoint)
packed_index_entry = self.checkpoint_index_formatter.pack(

@ -56,8 +56,6 @@ class StorageTest(unittest.TestCase):
(storage.ACTION_BACKWARD, b'bla1'))
self.assertFalse(ps.can_redo)
pause_evt = ps.pause()
ps.append_log_entry(b'bla2')
self.assertTrue(ps.can_undo)
self.assertEqual(
@ -81,8 +79,6 @@ class StorageTest(unittest.TestCase):
ps.get_log_entry_to_redo(),
(storage.ACTION_FORWARD, b'bla1'))
pause_evt.set()
ps.flush()
ps.flush_cache(0)
ps.redo()
@ -95,7 +91,6 @@ class StorageTest(unittest.TestCase):
ps.get_log_entry_to_redo(),
(storage.ACTION_FORWARD, b'bla2'))
ps.flush()
ps.flush_cache(0)
ps.redo()
self.assertTrue(ps.can_undo)
@ -104,7 +99,6 @@ class StorageTest(unittest.TestCase):
(storage.ACTION_BACKWARD, b'bla2'))
self.assertFalse(ps.can_redo)
ps.flush()
ps.flush_cache(0)
ps.append_log_entry(b'bla3')
self.assertTrue(ps.can_undo)
@ -115,8 +109,6 @@ class StorageTest(unittest.TestCase):
self.assertEqual(ps.next_sequence_number, 7)
ps.flush()
entries = list(ps.log_history_formatter.iter_unpack(ps.log_history))
self.assertEqual(
entries,
@ -228,13 +220,9 @@ class StorageTest(unittest.TestCase):
self.assertEqual(ps.logs_since_last_checkpoint, 1)
ps.undo()
self.assertEqual(ps.logs_since_last_checkpoint, 2)
unpause_evt = ps.pause()
ps.add_checkpoint(b'blurp2')
self.assertEqual(ps.logs_since_last_checkpoint, 0)
unpause_evt.set()
ps.flush()
entries = list(
ps.checkpoint_index_formatter.iter_unpack(
ps.checkpoint_index))

Loading…
Cancel
Save