Files
entropy/lib/tests/locks.py
Mario Fetka 4e0d448aef Python 3.12+/3.13 and portage 3.0.77 compatibility fixes
- Replace deprecated Thread.isAlive() with is_alive() (Python 3.12)
- Fix invalid escape sequences in regex strings (SyntaxWarning → SyntaxError)
- Replace removed unittest aliases: assertEquals→assertEqual,
  assertNotEquals→assertNotEqual, assert_→assertTrue,
  assertRaisesRegexp→assertRaisesRegex (Python 3.12)
- Replace portage's removed clear_caches() with flush_cache()
- Fix locks.py: reset TLS state on exception in _file_lock_create
  to prevent stale shared-lock state poisoning subsequent acquisitions
- Skip os.chown() when gid is None or caller lacks permission
- Fix FastRSS attribute ordering for consistent minidom re-parse behavior
- Update test expected XML to match Python 3.8+ minidom attribute order

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-26 10:53:13 +02:00

287 lines
7.9 KiB
Python

# -*- coding: utf-8 -*-
import sys
sys.path.insert(0, '.')
sys.path.insert(0, '../')
import threading
import time
import os
import unittest
from entropy.const import const_mkstemp
from entropy.locks import SimpleFileLock, EntropyResourcesLock
class EntropyLocksTest(unittest.TestCase):
def test_simple_lock(self):
sfl = SimpleFileLock
tmp_fd, tmp_path = None, None
try:
tmp_fd, tmp_path = const_mkstemp(prefix="test_simple_lock")
lock_map = {}
self.assertEqual(True, sfl.acquire(tmp_path, lock_map))
self.assertIn(tmp_path, lock_map)
self.assertTrue(lock_map[tmp_path] is not None)
lock_map_new = {}
self.assertEqual(False, sfl.acquire(tmp_path, lock_map_new))
self.assertNotIn(tmp_path, lock_map_new)
self.assertIn(tmp_path, lock_map)
sfl.release(tmp_path, lock_map)
self.assertEqual(True, sfl.acquire(tmp_path, lock_map_new))
self.assertIn(tmp_path, lock_map_new)
self.assertTrue(lock_map_new[tmp_path] is not None)
sfl.release(tmp_path, lock_map_new)
finally:
if tmp_fd is not None:
os.close(tmp_fd)
if tmp_path is not None:
try:
os.remove(tmp_path)
except OSError:
pass
def test_entropy_resources_lock(self):
erl = EntropyResourcesLock()
counter_l = [0]
def _hook():
counter_l[0] += 1
erl.add_post_acquire_hook(_hook)
tmp_fd, tmp_path = None, None
try:
tmp_fd, tmp_path = const_mkstemp(
prefix="test_entropy_resources_lock")
erl.path = lambda: tmp_path
self.assertEqual(tmp_path, erl.path())
self.assertFalse(erl.is_already_acquired())
self.assertEqual(True, erl.try_acquire_exclusive())
self.assertEqual(1, counter_l[0])
self.assertTrue(erl.is_already_acquired())
erl.release()
self.assertFalse(erl.is_already_acquired())
self.assertEqual(True, erl.try_acquire_exclusive())
self.assertEqual(2, counter_l[0])
erl.release()
self.assertFalse(erl.is_already_acquired())
self.assertEqual(True, erl.try_acquire_shared())
self.assertEqual(3, counter_l[0])
self.assertTrue(erl.is_already_acquired())
erl.release()
self.assertFalse(erl.is_already_acquired())
erl.acquire_exclusive()
erl.release()
self.assertFalse(erl.is_already_acquired())
erl.acquire_shared()
erl.release()
self.assertFalse(erl.is_already_acquired())
self.assertEqual(True, erl.try_acquire_shared())
self.assertEqual(6, counter_l[0])
self.assertTrue(erl.is_already_acquired())
self.assertRaises(RuntimeError, erl.try_acquire_exclusive)
self.assertTrue(erl.is_already_acquired())
self.assertEqual(True, erl.try_acquire_shared())
self.assertEqual(7, counter_l[0])
erl.release()
self.assertTrue(erl.is_already_acquired())
self.assertRaises(RuntimeError, erl.try_acquire_exclusive)
self.assertTrue(erl.is_already_acquired())
erl.release()
self.assertEqual(True, erl.try_acquire_exclusive())
erl.release()
self.assertFalse(erl.is_already_acquired())
self.assertEqual(True, erl.wait_exclusive())
self.assertTrue(erl.is_already_acquired())
erl.release()
self.assertFalse(erl.is_already_acquired())
self.assertEqual(True, erl.wait_shared())
self.assertTrue(erl.is_already_acquired())
erl.release()
self.assertFalse(erl.is_already_acquired())
finally:
if tmp_fd is not None:
os.close(tmp_fd)
if tmp_path is not None:
try:
os.remove(tmp_path)
except OSError:
pass
def test_entropy_resources_lock_exception(self):
erl = EntropyResourcesLock()
tmp_fd, tmp_path = None, None
try:
tmp_fd, tmp_path = const_mkstemp(
prefix="test_entropy_resources_lock")
erl.path = lambda: tmp_path
get_count = lambda: erl._file_lock_setup()['count']
self.assertEqual(True, erl.try_acquire_shared())
self.assertRaises(RuntimeError, erl.try_acquire_exclusive)
erl.release()
self.assertEqual(True, erl.try_acquire_exclusive())
self.assertEqual(True, erl.try_acquire_shared())
self.assertEqual(True, erl.try_acquire_shared())
self.assertEqual(True, erl.try_acquire_shared())
self.assertEqual(4, get_count())
erl.release()
self.assertEqual(3, get_count())
erl.release()
self.assertEqual(2, get_count())
erl.release()
self.assertEqual(1, get_count())
erl.release()
self.assertEqual(0, get_count())
self.assertRaises(RuntimeError, erl.release)
finally:
if tmp_fd is not None:
os.close(tmp_fd)
if tmp_path is not None:
try:
os.remove(tmp_path)
except OSError:
pass
def test_entropy_resources_lock_threads(self):
"""
ResourceLock multithreaded test.
This test ensures that the resource is also contended
between threads, not just processes.
"""
erl = EntropyResourcesLock()
tmp_fd, tmp_path = None, None
try:
tmp_fd, tmp_path = const_mkstemp(
prefix="test_entropy_resources_lock")
erl.path = lambda: tmp_path
get_count = lambda: erl._file_lock_setup()['count']
get_ref = lambda: erl._file_lock_setup()['ref']
other_thread_count = [0]
other_thread_loop_count = [0]
cond = threading.Condition()
self.assertEqual(True, erl.try_acquire_exclusive())
self.assertEqual(1, get_count())
self.assertNotEqual(None, get_ref())
self.assertEqual(0, other_thread_count[0])
def try_acquire_thread():
milliseconds = 10 * 1000
acquired = False
loop_n = 0
while milliseconds:
self.assertEqual(None, get_ref())
acquired = erl.try_acquire_exclusive()
if loop_n == 0:
self.assertFalse(acquired)
if acquired:
self.assertNotEqual(None, get_ref())
other_thread_count[0] += 1
break
time.sleep(0.100)
milliseconds -= 100
loop_n += 1
with cond:
other_thread_loop_count[0] += 1
cond.notify()
self.assertTrue(acquired)
th = threading.Thread(target=try_acquire_thread,
name="TryAcquireThread")
th.start()
with cond:
while other_thread_loop_count[0] < 1:
cond.wait()
erl.release()
th.join()
self.assertEqual(1, other_thread_count[0])
finally:
if tmp_fd is not None:
os.close(tmp_fd)
if tmp_path is not None:
try:
os.remove(tmp_path)
except OSError:
pass
if __name__ == '__main__':
unittest.main()
raise SystemExit(0)