Files
entropy/lib/tests/misc.py
Mario Fetka 4e0d448aef Python 3.12+/3.13 and portage 3.0.77 compatibility fixes
- Replace deprecated Thread.isAlive() with is_alive() (Python 3.12)
- Fix invalid escape sequences in regex strings (SyntaxWarning → SyntaxError)
- Replace removed unittest aliases: assertEquals→assertEqual,
  assertNotEquals→assertNotEqual, assert_→assertTrue,
  assertRaisesRegexp→assertRaisesRegex (Python 3.12)
- Replace portage's removed clear_caches() with flush_cache()
- Fix locks.py: reset TLS state on exception in _file_lock_create
  to prevent stale shared-lock state poisoning subsequent acquisitions
- Skip os.chown() when gid is None or caller lacks permission
- Fix FastRSS attribute ordering for consistent minidom re-parse behavior
- Update test expected XML to match Python 3.8+ minidom attribute order

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-26 10:53:13 +02:00

262 lines
7.9 KiB
Python

# -*- coding: utf-8 -*-
import sys
sys.path.insert(0, '.')
sys.path.insert(0, '../')
import os
import unittest
import tempfile
import json
from entropy.const import const_convert_to_unicode, const_mkstemp
from entropy.misc import Lifo, TimeScheduled, ParallelTask, EmailSender, \
FastRSS, FlockFile
class MiscTest(unittest.TestCase):
def setUp(self):
self.__lifo = Lifo()
self._lifo_item1 = set([1, 2, 3, 4])
self._lifo_item2 = set([1, 2, 3, 4])
self._lifo_item3 = dict(((None, x,) for x in range(0, 20)))
self._lifo_item4 = const_convert_to_unicode('èòàèòà', 'utf-8')
self._lifo_item5 = (1, 2, 3, 4,)
self._lifo_item6 = '------'
self._lifo_items = [self._lifo_item1, self._lifo_item2,
self._lifo_item3, self._lifo_item4, self._lifo_item5,
self._lifo_item6]
def test_lifo_push_pop(self):
# test push
for item in self._lifo_items:
self.__lifo.push(item)
# is filled?
self.assertEqual(self.__lifo.is_filled(), True)
# pop
myitems = self._lifo_items[:]
myitems.reverse()
for item in myitems:
lifo_item = self.__lifo.pop()
self.assertEqual(lifo_item, item)
# is filled?
self.assertEqual(self.__lifo.is_filled(), False)
# refill
for item in self._lifo_items:
self.__lifo.push(item)
# is filled?
self.assertEqual(self.__lifo.is_filled(), True)
# discard one
self.__lifo.discard(self._lifo_item3)
# remove and test
myitems.remove(self._lifo_item3)
for item in myitems:
lifo_item = self.__lifo.pop()
self.assertEqual(lifo_item, item)
# is filled?
self.assertEqual(self.__lifo.is_filled(), False)
# test if it raises ValueError exception
self.assertRaises(ValueError, self.__lifo.pop)
"""
XXX: causes random lock ups
def test_timesched(self):
self.t_sched_run = False
def do_t():
self.t_sched_run = True
t = TimeScheduled(5, do_t)
t.start()
t.kill()
t.join()
self.assertTrue(self.t_sched_run)
"""
def test_parallel_task(self):
self.t_sched_run = False
def do_t():
import time
time.sleep(1)
self.t_sched_run = True
t = ParallelTask(do_t)
t.start()
t.join()
self.assertTrue(self.t_sched_run)
def test_flock_file(self):
tmp_fd, tmp_path = None, None
try:
tmp_fd, tmp_path = const_mkstemp(prefix="entropy.misc.test")
mf = FlockFile(tmp_path, fd = tmp_fd)
mf.acquire_exclusive()
mf.demote()
mf.promote()
mf.release()
mf.acquire_shared()
mf.release()
mf.close()
finally:
if tmp_fd is not None:
self.assertRaises(OSError, os.close, tmp_fd)
if tmp_path is not None:
os.remove(tmp_path)
def test_flock_file_with(self):
tmp_fd, tmp_path = None, None
try:
tmp_fd, tmp_path = const_mkstemp(prefix="entropy.misc.test")
mf = FlockFile(tmp_path, fd = tmp_fd)
count = 0
with mf.exclusive():
count += 1
with mf.shared():
count += 1
mf.close()
self.assertEqual(count, 2)
finally:
if tmp_fd is not None:
self.assertRaises(OSError, os.close, tmp_fd)
if tmp_path is not None:
os.remove(tmp_path)
def test_email_sender(self):
mail_sender = 'test@test.com'
mail_recipients = ['test@test123.com']
mail_sub = 'hello'
mail_msg = 'stfu\nstfu\n'
def def_send(sender, dest, message):
self.assertEqual(mail_sender, sender)
self.assertEqual(mail_recipients, dest)
self.assertTrue(message.endswith(mail_msg))
sender = EmailSender()
sender.default_sender = def_send
sender.send_text_email(mail_sender, mail_recipients, mail_sub, mail_msg)
def test_fast_rss(self):
tmp_fd, tmp_path = const_mkstemp()
os.close(tmp_fd) # who cares
os.remove(tmp_path)
fast_rss = FastRSS(tmp_path)
self.assertTrue(fast_rss.is_new())
fast_rss.set_title("title").set_editor("editor").set_description(
"description").set_url("http://url").set_year("2011")
fast_rss.append("title1", "link1", "description1", "1")
fast_rss.append("title", "link", "description", "0")
fast_rss.append("title2", "link2", "description2", "2")
fast_rss.commit()
expected_outcome = """
<?xml version="1.0" ?>
<rss xmlns:atom="http://www.w3.org/2005/Atom" version="2.0">
<channel>
<title>title</title>
<link>http://url</link>
<description>description</description>
<language>en-EN</language>
<copyright>Sabayon Linux - (C) 2011</copyright>
<managingEditor>editor</managingEditor>
<item>
<description>description1</description>
<guid>link1</guid>
<link>link1</link>
<pubDate>1</pubDate>
<title>title1</title>
</item>
<item>
<description>description</description>
<guid>link</guid>
<link>link</link><pubDate>0</pubDate>
<title>title</title>
</item>
<item>
<description>description2</description>
<guid>link2</guid>
<link>link2</link>
<pubDate>2</pubDate>
<title>title2</title>
</item>
</channel>
</rss>"""
with open(tmp_path, "r") as tmp_f:
self.assertEqual(
expected_outcome.replace(" ", "").replace("\n", ""),
tmp_f.read().replace(" ", "").replace("\n", ""))
# make sure metadata doesn't get screwed
fast_rss = FastRSS(tmp_path)
self.assertFalse(fast_rss.is_new())
fast_rss.commit()
with open(tmp_path, "r") as tmp_f:
self.assertEqual(
expected_outcome.replace(" ", "").replace("\n", ""),
tmp_f.read().replace(" ", "").replace("\n", ""))
os.remove(tmp_path)
def test_fast_rss_json_payload(self):
tmp_fd, tmp_path = const_mkstemp()
os.close(tmp_fd) # who cares
os.remove(tmp_path)
fast_rss = FastRSS(tmp_path)
self.assertTrue(fast_rss.is_new())
fast_rss.set_title("title").set_editor("editor").set_description(
"description").set_url("http://url").set_year("2011")
desc_data = {
"name": "hello.world",
"security": False,
"number": 123,
"float": 123.0,
"list": [1,2,3],
}
fast_rss.append("title1", "link1", json.dumps(desc_data), "1")
fast_rss.commit()
doc = fast_rss.get()
channel = doc.getElementsByTagName("channel").item(0)
item = channel.getElementsByTagName("item").item(0)
desc = item.getElementsByTagName("description").item(0)
json_data = desc.firstChild.data.strip()
desc_data_out = json.loads(json_data)
self.assertEqual(desc_data, desc_data_out)
del doc
# test append
fast_rss = FastRSS(tmp_path)
self.assertFalse(fast_rss.is_new())
fast_rss.append("title2", "link2", json.dumps(desc_data), "2")
fast_rss.commit()
doc = fast_rss.get()
channel = doc.getElementsByTagName("channel").item(0)
items = channel.getElementsByTagName("item")
self.assertEqual(len(items), 2)
for item in items:
desc = item.getElementsByTagName("description").item(0)
json_data = desc.firstChild.data.strip()
desc_data_out = json.loads(json_data)
self.assertEqual(desc_data, desc_data_out)
os.remove(tmp_path)
if __name__ == '__main__':
unittest.main()
raise SystemExit(0)