The new WebService get_documents() is more efficient in terms of server resources consumption since it doesn't force the WS engine to calculate the full result set length, which had little use anyway in Rigo. This commit switches entropy.client.services' DocumentList to and reverse dependencies to use the new WS API. The older WS API will be kept alive for a while (6 months, roughly).
992 lines
46 KiB
Python
992 lines
46 KiB
Python
# -*- coding: utf-8 -*-
|
|
import sys
|
|
import os
|
|
import tempfile
|
|
import unittest
|
|
sys.path.insert(0, '../')
|
|
sys.path.insert(0, '../../')
|
|
|
|
from entropy.client.interfaces import Client
|
|
from entropy.services.client import WebService
|
|
from entropy.client.services.interfaces import Document, DocumentFactory, \
|
|
DocumentList, ClientWebService
|
|
from entropy.const import etpConst, etpUi, const_convert_to_rawstring, \
|
|
const_convert_to_unicode, const_get_stringtype
|
|
import entropy.tools
|
|
import tests._misc as _misc
|
|
from entropy.core.settings.base import SystemSettings
|
|
|
|
|
|
class EntropyWebServicesTest(unittest.TestCase):
|
|
|
|
def __init__(self, *args):
|
|
unittest.TestCase.__init__(self, *args)
|
|
self._repository_id = \
|
|
SystemSettings()['repositories']['default_repository']
|
|
|
|
def setUp(self):
|
|
sys.stdout.write("%s called\n" % (self,))
|
|
sys.stdout.flush()
|
|
self._entropy = Client(installed_repo = -1, indexing = False,
|
|
xcache = False, repo_validation = False)
|
|
self._factory = self._entropy.WebServices()
|
|
self._fake_user = "entropy_unittest"
|
|
self._fake_pass = "entropy_unittest"
|
|
self._fake_unicode_user = const_convert_to_unicode("entropy_unittèst2",
|
|
enctype = "utf-8")
|
|
self._fake_unicode_pass = const_convert_to_unicode("entropy_unittèst",
|
|
enctype = "utf-8")
|
|
self._fake_package_name = "app-misc/entropy_unittest"
|
|
self._fake_package_name_utf8 = const_convert_to_unicode(
|
|
"app-misc/entropy_unìttest")
|
|
self._real_package_name = "media-sound/amarok"
|
|
|
|
def tearDown(self):
|
|
"""
|
|
tearDown is run after each test
|
|
"""
|
|
sys.stdout.write("%s ran\n" % (self,))
|
|
sys.stdout.flush()
|
|
# calling destroy() and shutdown()
|
|
# need to call destroy() directly to remove all the SystemSettings
|
|
# plugins because shutdown() doesn't, since it's meant to be called
|
|
# right before terminating the process
|
|
self._entropy.destroy()
|
|
self._entropy.shutdown()
|
|
|
|
def test_credentials(self):
|
|
webserv = self._factory.new(self._repository_id)
|
|
webserv.add_credentials("lxnay", "test")
|
|
self.assertEqual(webserv.get_credentials(), "lxnay")
|
|
self.assertEqual(webserv.credentials_available(), True)
|
|
self.assertTrue(webserv.remove_credentials())
|
|
self.assertEqual(webserv.credentials_available(), False)
|
|
|
|
def test_credentials_utf8(self):
|
|
user = const_convert_to_unicode("lxnày")
|
|
password = const_convert_to_unicode("pààààss")
|
|
webserv = self._factory.new(self._repository_id)
|
|
self.assertTrue(webserv.service_available(cache = False))
|
|
webserv.add_credentials(user, password)
|
|
self.assertEqual(webserv.get_credentials(), user)
|
|
self.assertEqual(webserv.credentials_available(), True)
|
|
self.assertTrue(webserv.remove_credentials())
|
|
self.assertEqual(webserv.credentials_available(), False)
|
|
|
|
def test_validate_credentials(self):
|
|
webserv = self._factory.new(self._repository_id)
|
|
webserv.remove_credentials()
|
|
webserv.add_credentials(self._fake_user, self._fake_pass)
|
|
try:
|
|
self.assertTrue(webserv.credentials_available())
|
|
self.assertEqual(webserv.get_credentials(), self._fake_user)
|
|
# credentials must be valid
|
|
webserv.validate_credentials()
|
|
finally:
|
|
webserv.remove_credentials()
|
|
|
|
def test_validate_credentials_error(self):
|
|
webserv = self._factory.new(self._repository_id)
|
|
webserv.remove_credentials()
|
|
user = const_convert_to_unicode("lxnay")
|
|
password = const_convert_to_unicode("paasssdsss")
|
|
webserv.add_credentials(user, password)
|
|
try:
|
|
self.assertTrue(webserv.credentials_available())
|
|
self.assertEqual(webserv.get_credentials(), user)
|
|
# credentials must be INVALID
|
|
self.assertRaises(WebService.AuthenticationFailed,
|
|
webserv.validate_credentials)
|
|
finally:
|
|
webserv.remove_credentials()
|
|
|
|
def test_validate_credentials_utf8(self):
|
|
webserv = self._factory.new(self._repository_id)
|
|
webserv.remove_credentials()
|
|
webserv.add_credentials(self._fake_unicode_user,
|
|
self._fake_unicode_pass)
|
|
try:
|
|
self.assertTrue(webserv.credentials_available())
|
|
self.assertEqual(webserv.get_credentials(), self._fake_unicode_user)
|
|
# credentials must be valid
|
|
webserv.validate_credentials()
|
|
finally:
|
|
webserv.remove_credentials()
|
|
|
|
def test_query_utf8(self):
|
|
webserv = self._factory.new(self._repository_id)
|
|
# this must raise WebService.MethodResponseError
|
|
self.assertRaises(WebService.MethodResponseError,
|
|
webserv.get_votes, (self._fake_package_name_utf8,))
|
|
|
|
def test_get_votes(self):
|
|
webserv = self._factory.new(self._repository_id)
|
|
# this must return valid data
|
|
vote_data = webserv.get_votes([self._fake_package_name], cache = False)
|
|
self.assertTrue(isinstance(vote_data, dict))
|
|
self.assertTrue(self._fake_package_name in vote_data)
|
|
if vote_data[self._fake_package_name] is not None:
|
|
self.assertTrue(isinstance(vote_data[self._fake_package_name], float))
|
|
else:
|
|
self.assertTrue(vote_data[self._fake_package_name] is None)
|
|
|
|
def test_get_available_votes(self):
|
|
webserv = self._factory.new(self._repository_id)
|
|
# this must return valid data
|
|
vote_data = webserv.get_available_votes(cache = False)
|
|
self.assertTrue(isinstance(vote_data, dict))
|
|
self.assertTrue(self._real_package_name in vote_data)
|
|
self.assertTrue(isinstance(vote_data[self._real_package_name], float))
|
|
for key, val in vote_data.items():
|
|
self.assertTrue(entropy.tools.validate_package_name(key))
|
|
self.assertTrue(isinstance(val, float))
|
|
self.assertTrue(int(val) in ClientWebService.VALID_VOTES)
|
|
|
|
def test_get_votes_cannot_exists(self):
|
|
webserv = self._factory.new(self._repository_id)
|
|
key = "app-doesntexistforsure/asdweasfoo"
|
|
# this must return valid data
|
|
vote_data = webserv.get_votes([key], cache = False)
|
|
self.assertTrue(isinstance(vote_data, dict))
|
|
self.assertTrue(key in vote_data)
|
|
self.assertTrue(vote_data[key] is None)
|
|
|
|
def test_add_vote(self):
|
|
webserv = self._factory.new(self._repository_id)
|
|
self.assertTrue(webserv.service_available(cache = False))
|
|
# try with success
|
|
webserv.remove_credentials()
|
|
try:
|
|
webserv.add_vote(self._fake_package_name, 4)
|
|
# webserv.AuthenticationRequired should be raised
|
|
self.assertTrue(False)
|
|
except webserv.AuthenticationRequired:
|
|
webserv.add_credentials(self._fake_user, self._fake_pass)
|
|
self.assertTrue(webserv.credentials_available())
|
|
self.assertEqual(webserv.get_credentials(), self._fake_user)
|
|
# credentials must be valid
|
|
webserv.validate_credentials()
|
|
# now it should not crash
|
|
webserv.add_vote(self._fake_package_name, 4)
|
|
finally:
|
|
webserv.remove_credentials()
|
|
|
|
# now check back if average vote is still 4.0
|
|
vote = webserv.get_votes(
|
|
[self._fake_package_name])[self._fake_package_name]
|
|
self.assertEqual(vote, 4.0)
|
|
|
|
def test_add_vote_failure(self):
|
|
webserv = self._factory.new(self._repository_id)
|
|
self.assertTrue(webserv.service_available(cache = False))
|
|
# try with success
|
|
webserv.remove_credentials()
|
|
invalid_package_name = self._fake_package_name + "'''"
|
|
try:
|
|
webserv.add_credentials(self._fake_user, self._fake_pass)
|
|
self.assertTrue(webserv.credentials_available())
|
|
self.assertEqual(webserv.get_credentials(), self._fake_user)
|
|
# credentials must be valid
|
|
webserv.validate_credentials()
|
|
webserv.add_vote(invalid_package_name, 4)
|
|
self.assertTrue(False)
|
|
except webserv.MethodResponseError:
|
|
self.assertTrue(True) # valid
|
|
finally:
|
|
webserv.remove_credentials()
|
|
|
|
# now check back if average vote is still 4.0
|
|
try:
|
|
vote = webserv.get_votes(
|
|
[invalid_package_name])[invalid_package_name]
|
|
self.assertTrue(False)
|
|
except webserv.MethodResponseError:
|
|
self.assertTrue(True) # valid
|
|
|
|
def test_get_available_downloads(self):
|
|
webserv = self._factory.new(self._repository_id)
|
|
# this must return valid data
|
|
down_data = webserv.get_available_downloads(cache = False)
|
|
self.assertTrue(isinstance(down_data, dict))
|
|
self.assertTrue(self._real_package_name in down_data)
|
|
self.assertTrue(isinstance(down_data[self._real_package_name], int))
|
|
for key, val in down_data.items():
|
|
self.assertTrue(entropy.tools.validate_package_name(key))
|
|
self.assertTrue(isinstance(val, int))
|
|
self.assertTrue(val >= 0)
|
|
|
|
def test_add_downloads(self):
|
|
webserv = self._factory.new(self._repository_id)
|
|
self.assertTrue(webserv.service_available(cache = False))
|
|
pk = self._fake_package_name
|
|
pkg_list = [pk]
|
|
cur_downloads = webserv.get_downloads(pkg_list, cache = False)[pk]
|
|
if cur_downloads is None:
|
|
cur_downloads = 0
|
|
|
|
# can be False if the test is run repeatedly, due to the anti-flood
|
|
# protection
|
|
added = webserv.add_downloads([self._fake_package_name])
|
|
self.assertTrue(isinstance(added, bool))
|
|
|
|
# expect (cur_downloads + 1) now, use cache, so to check if cache
|
|
# is cleared correctly
|
|
expected_downloads = cur_downloads
|
|
if added:
|
|
expected_downloads += 1
|
|
new_downloads = webserv.get_downloads(pkg_list)[pk]
|
|
self.assertEqual(expected_downloads, new_downloads)
|
|
|
|
def test_add_icon(self):
|
|
webserv = self._factory.new(self._repository_id)
|
|
self.assertTrue(webserv.service_available(cache = False))
|
|
doc_factory = webserv.document_factory()
|
|
keywords = "keyword1 keyword2"
|
|
description = const_convert_to_unicode("descrìption")
|
|
title = const_convert_to_unicode("tìtle")
|
|
|
|
tmp_fd, tmp_path = tempfile.mkstemp()
|
|
with open(tmp_path, "ab+") as tmp_f:
|
|
tmp_f.write(const_convert_to_rawstring('\x89PNG\x00\x00'))
|
|
tmp_f.flush()
|
|
tmp_f.seek(0)
|
|
doc = doc_factory.icon(self._fake_user, tmp_f, title,
|
|
description, keywords)
|
|
webserv.remove_credentials()
|
|
try:
|
|
webserv.add_document(self._fake_package_name, doc)
|
|
# webserv.AuthenticationRequired should be raised
|
|
self.assertTrue(False)
|
|
except webserv.AuthenticationRequired:
|
|
webserv.add_credentials(self._fake_user, self._fake_pass)
|
|
self.assertTrue(webserv.credentials_available())
|
|
self.assertEqual(webserv.get_credentials(), self._fake_user)
|
|
# credentials must be valid
|
|
webserv.validate_credentials()
|
|
# now it should not crash
|
|
new_doc = webserv.add_document(self._fake_package_name, doc)
|
|
# got the new document back, which is the same plus document_id
|
|
finally:
|
|
webserv.remove_credentials()
|
|
|
|
# now check back if document is there
|
|
doc_id = new_doc[Document.DOCUMENT_DOCUMENT_ID]
|
|
remote_doc = webserv.get_documents_by_id([doc_id],
|
|
cache = False)[doc_id]
|
|
self.assertTrue(new_doc.is_icon())
|
|
self.assertTrue(remote_doc.is_icon())
|
|
self.assertTrue(not remote_doc.is_comment())
|
|
self.assertTrue(not remote_doc.is_image())
|
|
self.assertTrue(not remote_doc.is_video())
|
|
self.assertTrue(not remote_doc.is_file())
|
|
self.assertEqual(new_doc.repository_id(), self._repository_id)
|
|
self.assertEqual(new_doc.document_type(), remote_doc.document_type())
|
|
self.assertEqual(new_doc.document_id(), remote_doc.document_id())
|
|
self.assertEqual(new_doc.repository_id(), remote_doc.repository_id())
|
|
self.assertEqual(new_doc.document_keywords(), keywords)
|
|
|
|
self.assertEqual(new_doc[DocumentFactory.DOCUMENT_USERNAME_ID],
|
|
remote_doc[DocumentFactory.DOCUMENT_USERNAME_ID])
|
|
self.assertEqual(new_doc[Document.DOCUMENT_DOCUMENT_ID],
|
|
remote_doc[Document.DOCUMENT_DOCUMENT_ID])
|
|
self.assertEqual(new_doc[Document.DOCUMENT_DESCRIPTION_ID],
|
|
remote_doc[Document.DOCUMENT_DESCRIPTION_ID])
|
|
self.assertEqual(remote_doc[Document.DOCUMENT_DESCRIPTION_ID],
|
|
description)
|
|
self.assertEqual(new_doc[Document.DOCUMENT_TITLE_ID],
|
|
remote_doc[Document.DOCUMENT_TITLE_ID])
|
|
self.assertEqual(remote_doc[Document.DOCUMENT_TITLE_ID], title)
|
|
self.assertEqual(new_doc.document_timestamp(),
|
|
remote_doc.document_timestamp())
|
|
self.assertEqual(new_doc.document_keywords(),
|
|
remote_doc.document_keywords())
|
|
|
|
# now try to remove
|
|
webserv.remove_credentials()
|
|
try:
|
|
webserv.remove_document(remote_doc[Document.DOCUMENT_DOCUMENT_ID])
|
|
# webserv.AuthenticationRequired should be raised
|
|
self.assertTrue(False)
|
|
except webserv.AuthenticationRequired:
|
|
webserv.add_credentials(self._fake_user, self._fake_pass)
|
|
self.assertTrue(webserv.credentials_available())
|
|
self.assertEqual(webserv.get_credentials(), self._fake_user)
|
|
# credentials must be valid
|
|
webserv.validate_credentials()
|
|
# now it should not crash
|
|
self.assertTrue(
|
|
webserv.remove_document(
|
|
remote_doc[Document.DOCUMENT_DOCUMENT_ID]))
|
|
# got the new document back, which is the same plus document_id
|
|
finally:
|
|
webserv.remove_credentials()
|
|
|
|
def test_add_icon_fail(self):
|
|
webserv = self._factory.new(self._repository_id)
|
|
self.assertTrue(webserv.service_available(cache = False))
|
|
doc_factory = webserv.document_factory()
|
|
keywords = "keyword1 keyword2"
|
|
description = const_convert_to_unicode("descrìption")
|
|
title = const_convert_to_unicode("tìtle")
|
|
|
|
tmp_fd, tmp_path = tempfile.mkstemp()
|
|
with open(tmp_path, "ab+") as tmp_f:
|
|
img_dump = '\x89\xFF\x00\x00\x89\xFF\x89\xFF\x89\xFF'
|
|
tmp_f.write(const_convert_to_rawstring(img_dump))
|
|
tmp_f.flush()
|
|
tmp_f.seek(0)
|
|
doc = doc_factory.icon(self._fake_user, tmp_f, title,
|
|
description, keywords)
|
|
webserv.remove_credentials()
|
|
try:
|
|
webserv.add_document(self._fake_package_name, doc)
|
|
# webserv.AuthenticationRequired should be raised
|
|
self.assertTrue(False)
|
|
except webserv.AuthenticationRequired:
|
|
webserv.add_credentials(self._fake_user, self._fake_pass)
|
|
self.assertTrue(webserv.credentials_available())
|
|
self.assertEqual(webserv.get_credentials(), self._fake_user)
|
|
# credentials must be valid
|
|
webserv.validate_credentials()
|
|
# now it should not crash
|
|
self.assertRaises(WebService.MethodResponseError,
|
|
webserv.add_document, self._fake_package_name, doc)
|
|
# got the new document back, which is the same plus document_id
|
|
finally:
|
|
webserv.remove_credentials()
|
|
|
|
def test_add_image_fail(self):
|
|
webserv = self._factory.new(self._repository_id)
|
|
self.assertTrue(webserv.service_available(cache = False))
|
|
doc_factory = webserv.document_factory()
|
|
keywords = "keyword1 keyword2"
|
|
description = const_convert_to_unicode("descrìption")
|
|
title = const_convert_to_unicode("tìtle")
|
|
|
|
tmp_fd, tmp_path = tempfile.mkstemp()
|
|
with open(tmp_path, "ab+") as tmp_f:
|
|
img_dump = '\x89\xFF\x00\x00\x89\xFF\x89\xFF\x89\xFF'
|
|
tmp_f.write(const_convert_to_rawstring(img_dump))
|
|
tmp_f.flush()
|
|
tmp_f.seek(0)
|
|
doc = doc_factory.image(self._fake_user, tmp_f, title,
|
|
description, keywords)
|
|
webserv.remove_credentials()
|
|
try:
|
|
webserv.add_document(self._fake_package_name, doc)
|
|
# webserv.AuthenticationRequired should be raised
|
|
self.assertTrue(False)
|
|
except webserv.AuthenticationRequired:
|
|
webserv.add_credentials(self._fake_user, self._fake_pass)
|
|
self.assertTrue(webserv.credentials_available())
|
|
self.assertEqual(webserv.get_credentials(), self._fake_user)
|
|
# credentials must be valid
|
|
webserv.validate_credentials()
|
|
# now it should not crash
|
|
self.assertRaises(WebService.MethodResponseError,
|
|
webserv.add_document, self._fake_package_name, doc)
|
|
# got the new document back, which is the same plus document_id
|
|
finally:
|
|
webserv.remove_credentials()
|
|
|
|
def test_add_image(self):
|
|
webserv = self._factory.new(self._repository_id)
|
|
self.assertTrue(webserv.service_available(cache = False))
|
|
doc_factory = webserv.document_factory()
|
|
keywords = "keyword1 keyword2"
|
|
description = const_convert_to_unicode("descrìption")
|
|
title = const_convert_to_unicode("tìtle")
|
|
|
|
tmp_fd, tmp_path = tempfile.mkstemp()
|
|
with open(tmp_path, "ab+") as tmp_f:
|
|
tmp_f.write(const_convert_to_rawstring('\x89PNG\x00\x00'))
|
|
tmp_f.flush()
|
|
tmp_f.seek(0)
|
|
doc = doc_factory.image(self._fake_user, tmp_f, title,
|
|
description, keywords)
|
|
webserv.remove_credentials()
|
|
try:
|
|
webserv.add_document(self._fake_package_name, doc)
|
|
# webserv.AuthenticationRequired should be raised
|
|
self.assertTrue(False)
|
|
except webserv.AuthenticationRequired:
|
|
webserv.add_credentials(self._fake_user, self._fake_pass)
|
|
self.assertTrue(webserv.credentials_available())
|
|
self.assertEqual(webserv.get_credentials(), self._fake_user)
|
|
# credentials must be valid
|
|
webserv.validate_credentials()
|
|
# now it should not crash
|
|
new_doc = webserv.add_document(self._fake_package_name, doc)
|
|
# got the new document back, which is the same plus document_id
|
|
finally:
|
|
webserv.remove_credentials()
|
|
|
|
# now check back if document is there
|
|
doc_id = new_doc[Document.DOCUMENT_DOCUMENT_ID]
|
|
remote_doc = webserv.get_documents_by_id([doc_id],
|
|
cache = False)[doc_id]
|
|
self.assertTrue(new_doc.is_image())
|
|
self.assertTrue(remote_doc.is_image())
|
|
self.assertTrue(not remote_doc.is_comment())
|
|
self.assertTrue(not remote_doc.is_icon())
|
|
self.assertTrue(not remote_doc.is_video())
|
|
self.assertTrue(not remote_doc.is_file())
|
|
self.assertEqual(new_doc.document_type(), remote_doc.document_type())
|
|
self.assertEqual(new_doc.document_id(), remote_doc.document_id())
|
|
self.assertEqual(new_doc.repository_id(), self._repository_id)
|
|
self.assertEqual(new_doc.repository_id(), remote_doc.repository_id())
|
|
self.assertEqual(new_doc.document_keywords(), keywords)
|
|
|
|
self.assertEqual(new_doc[DocumentFactory.DOCUMENT_USERNAME_ID],
|
|
remote_doc[DocumentFactory.DOCUMENT_USERNAME_ID])
|
|
self.assertEqual(new_doc[Document.DOCUMENT_DOCUMENT_ID],
|
|
remote_doc[Document.DOCUMENT_DOCUMENT_ID])
|
|
self.assertEqual(new_doc[Document.DOCUMENT_DESCRIPTION_ID],
|
|
remote_doc[Document.DOCUMENT_DESCRIPTION_ID])
|
|
self.assertEqual(remote_doc[Document.DOCUMENT_DESCRIPTION_ID],
|
|
description)
|
|
self.assertEqual(new_doc[Document.DOCUMENT_TITLE_ID],
|
|
remote_doc[Document.DOCUMENT_TITLE_ID])
|
|
self.assertEqual(remote_doc[Document.DOCUMENT_TITLE_ID], title)
|
|
self.assertEqual(new_doc.document_timestamp(),
|
|
remote_doc.document_timestamp())
|
|
self.assertEqual(new_doc.document_keywords(),
|
|
remote_doc.document_keywords())
|
|
|
|
# now try to remove
|
|
webserv.remove_credentials()
|
|
try:
|
|
webserv.remove_document(remote_doc[Document.DOCUMENT_DOCUMENT_ID])
|
|
# webserv.AuthenticationRequired should be raised
|
|
self.assertTrue(False)
|
|
except webserv.AuthenticationRequired:
|
|
webserv.add_credentials(self._fake_user, self._fake_pass)
|
|
self.assertTrue(webserv.credentials_available())
|
|
self.assertEqual(webserv.get_credentials(), self._fake_user)
|
|
# credentials must be valid
|
|
webserv.validate_credentials()
|
|
# now it should not crash
|
|
self.assertTrue(
|
|
webserv.remove_document(
|
|
remote_doc[Document.DOCUMENT_DOCUMENT_ID]))
|
|
# got the new document back, which is the same plus document_id
|
|
finally:
|
|
webserv.remove_credentials()
|
|
|
|
def test_add_comment(self):
|
|
webserv = self._factory.new(self._repository_id)
|
|
self.assertTrue(webserv.service_available(cache = False))
|
|
doc_factory = webserv.document_factory()
|
|
keywords = "keyword1 keyword2"
|
|
comment = const_convert_to_unicode("comment hellò")
|
|
title = const_convert_to_unicode("tìtle")
|
|
|
|
doc = doc_factory.comment(self._fake_user, comment, title, keywords)
|
|
webserv.remove_credentials()
|
|
try:
|
|
webserv.add_document(self._fake_package_name, doc)
|
|
# webserv.AuthenticationRequired should be raised
|
|
self.assertTrue(False)
|
|
except webserv.AuthenticationRequired:
|
|
webserv.add_credentials(self._fake_user, self._fake_pass)
|
|
self.assertTrue(webserv.credentials_available())
|
|
self.assertEqual(webserv.get_credentials(), self._fake_user)
|
|
# credentials must be valid
|
|
webserv.validate_credentials()
|
|
# now it should not crash
|
|
new_doc = webserv.add_document(self._fake_package_name, doc)
|
|
self.assertTrue(new_doc is not None)
|
|
# got the new document back, which is the same plus document_id
|
|
finally:
|
|
webserv.remove_credentials()
|
|
|
|
# now check back if document is there
|
|
doc_id = new_doc[Document.DOCUMENT_DOCUMENT_ID]
|
|
remote_doc = webserv.get_documents_by_id([doc_id],
|
|
cache = False)[doc_id]
|
|
self.assertTrue(remote_doc is not None)
|
|
self.assertTrue(new_doc.is_comment())
|
|
self.assertTrue(remote_doc.is_comment())
|
|
self.assertTrue(not remote_doc.is_image())
|
|
self.assertTrue(not remote_doc.is_icon())
|
|
self.assertTrue(not remote_doc.is_video())
|
|
self.assertTrue(not remote_doc.is_file())
|
|
self.assertEqual(new_doc.repository_id(), self._repository_id)
|
|
self.assertEqual(new_doc.document_type(), remote_doc.document_type())
|
|
self.assertEqual(new_doc.document_id(), remote_doc.document_id())
|
|
self.assertEqual(new_doc.repository_id(), remote_doc.repository_id())
|
|
self.assertEqual(new_doc.document_keywords(), keywords)
|
|
|
|
self.assertEqual(new_doc[DocumentFactory.DOCUMENT_USERNAME_ID],
|
|
remote_doc[DocumentFactory.DOCUMENT_USERNAME_ID])
|
|
self.assertEqual(new_doc[Document.DOCUMENT_DOCUMENT_ID],
|
|
remote_doc[Document.DOCUMENT_DOCUMENT_ID])
|
|
self.assertEqual(remote_doc[Document.DOCUMENT_DATA_ID], comment)
|
|
self.assertEqual(new_doc[Document.DOCUMENT_TITLE_ID],
|
|
remote_doc[Document.DOCUMENT_TITLE_ID])
|
|
self.assertEqual(remote_doc[Document.DOCUMENT_TITLE_ID], title)
|
|
self.assertEqual(new_doc.document_timestamp(),
|
|
remote_doc.document_timestamp())
|
|
self.assertEqual(new_doc.document_keywords(),
|
|
remote_doc.document_keywords())
|
|
|
|
# now try to remove
|
|
webserv.remove_credentials()
|
|
try:
|
|
webserv.remove_document(remote_doc[Document.DOCUMENT_DOCUMENT_ID])
|
|
# webserv.AuthenticationRequired should be raised
|
|
self.assertTrue(False)
|
|
except webserv.AuthenticationRequired:
|
|
webserv.add_credentials(self._fake_user, self._fake_pass)
|
|
self.assertTrue(webserv.credentials_available())
|
|
self.assertEqual(webserv.get_credentials(), self._fake_user)
|
|
# credentials must be valid
|
|
webserv.validate_credentials()
|
|
# now it should not crash
|
|
self.assertTrue(
|
|
webserv.remove_document(
|
|
remote_doc[Document.DOCUMENT_DOCUMENT_ID]))
|
|
# got the new document back, which is the same plus document_id
|
|
finally:
|
|
webserv.remove_credentials()
|
|
|
|
def test_add_file(self):
|
|
webserv = self._factory.new(self._repository_id)
|
|
self.assertTrue(webserv.service_available())
|
|
doc_factory = webserv.document_factory()
|
|
keywords = "keyword1 keyword2"
|
|
description = const_convert_to_unicode("descrìption")
|
|
title = const_convert_to_unicode("tìtle")
|
|
|
|
tmp_fd, tmp_path = tempfile.mkstemp()
|
|
with open(tmp_path, "ab+") as tmp_f:
|
|
tmp_f.write(const_convert_to_rawstring('BZ2\x00\x00'))
|
|
tmp_f.flush()
|
|
tmp_f.seek(0)
|
|
doc = doc_factory.file(self._fake_user, tmp_f, title,
|
|
description, keywords)
|
|
webserv.remove_credentials()
|
|
try:
|
|
webserv.add_document(self._fake_package_name, doc)
|
|
# webserv.AuthenticationRequired should be raised
|
|
self.assertTrue(False)
|
|
except webserv.AuthenticationRequired:
|
|
webserv.add_credentials(self._fake_user, self._fake_pass)
|
|
self.assertTrue(webserv.credentials_available())
|
|
self.assertEqual(webserv.get_credentials(), self._fake_user)
|
|
# credentials must be valid
|
|
webserv.validate_credentials()
|
|
# now it should not crash
|
|
new_doc = webserv.add_document(self._fake_package_name, doc)
|
|
self.assertTrue(new_doc is not None)
|
|
# got the new document back, which is the same plus document_id
|
|
finally:
|
|
webserv.remove_credentials()
|
|
|
|
# now check back if document is there
|
|
doc_id = new_doc[Document.DOCUMENT_DOCUMENT_ID]
|
|
remote_doc = webserv.get_documents_by_id([doc_id],
|
|
cache = False)[doc_id]
|
|
self.assertTrue(new_doc.is_file())
|
|
self.assertTrue(remote_doc.is_file())
|
|
self.assertTrue(not remote_doc.is_comment())
|
|
self.assertTrue(not remote_doc.is_icon())
|
|
self.assertTrue(not remote_doc.is_video())
|
|
self.assertTrue(not remote_doc.is_image())
|
|
self.assertEqual(new_doc.document_type(), remote_doc.document_type())
|
|
self.assertEqual(new_doc.document_id(), remote_doc.document_id())
|
|
self.assertEqual(new_doc.repository_id(), self._repository_id)
|
|
self.assertEqual(new_doc.repository_id(), remote_doc.repository_id())
|
|
self.assertEqual(new_doc.document_keywords(), keywords)
|
|
|
|
self.assertEqual(new_doc[DocumentFactory.DOCUMENT_USERNAME_ID],
|
|
remote_doc[DocumentFactory.DOCUMENT_USERNAME_ID])
|
|
self.assertEqual(new_doc[Document.DOCUMENT_DOCUMENT_ID],
|
|
remote_doc[Document.DOCUMENT_DOCUMENT_ID])
|
|
self.assertEqual(new_doc[Document.DOCUMENT_DESCRIPTION_ID],
|
|
remote_doc[Document.DOCUMENT_DESCRIPTION_ID])
|
|
self.assertEqual(remote_doc[Document.DOCUMENT_DESCRIPTION_ID],
|
|
description)
|
|
self.assertEqual(new_doc[Document.DOCUMENT_TITLE_ID],
|
|
remote_doc[Document.DOCUMENT_TITLE_ID])
|
|
self.assertEqual(remote_doc[Document.DOCUMENT_TITLE_ID], title)
|
|
self.assertEqual(new_doc.document_timestamp(),
|
|
remote_doc.document_timestamp())
|
|
self.assertEqual(new_doc.document_keywords(),
|
|
remote_doc.document_keywords())
|
|
|
|
# now try to remove
|
|
webserv.remove_credentials()
|
|
try:
|
|
webserv.remove_document(remote_doc[Document.DOCUMENT_DOCUMENT_ID])
|
|
# webserv.AuthenticationRequired should be raised
|
|
self.assertTrue(False)
|
|
except webserv.AuthenticationRequired:
|
|
webserv.add_credentials(self._fake_user, self._fake_pass)
|
|
self.assertTrue(webserv.credentials_available())
|
|
self.assertEqual(webserv.get_credentials(), self._fake_user)
|
|
# credentials must be valid
|
|
webserv.validate_credentials()
|
|
# now it should not crash
|
|
self.assertTrue(
|
|
webserv.remove_document(
|
|
remote_doc[Document.DOCUMENT_DOCUMENT_ID]))
|
|
# got the new document back, which is the same plus document_id
|
|
finally:
|
|
webserv.remove_credentials()
|
|
|
|
def test_add_video(self):
|
|
webserv = self._factory.new(self._repository_id)
|
|
self.assertTrue(webserv.service_available())
|
|
doc_factory = webserv.document_factory()
|
|
keywords = "keyword1 keyword2"
|
|
description = const_convert_to_unicode("descrìption")
|
|
title = const_convert_to_unicode("tìtle")
|
|
|
|
test_video_file = _misc.get_test_video_file()
|
|
with open(test_video_file, "rb") as tmp_f:
|
|
doc = doc_factory.video(self._fake_user, tmp_f, title,
|
|
description, keywords)
|
|
# do not actually publish the video
|
|
doc['pretend'] = 1
|
|
webserv.remove_credentials()
|
|
try:
|
|
webserv.add_document(self._fake_package_name, doc)
|
|
# webserv.AuthenticationRequired should be raised
|
|
self.assertTrue(False)
|
|
except webserv.AuthenticationRequired:
|
|
webserv.add_credentials(self._fake_user, self._fake_pass)
|
|
self.assertTrue(webserv.credentials_available())
|
|
self.assertEqual(webserv.get_credentials(), self._fake_user)
|
|
# credentials must be valid
|
|
webserv.validate_credentials()
|
|
# now it should not crash
|
|
new_doc = webserv.add_document(self._fake_package_name, doc)
|
|
self.assertTrue(new_doc is not None)
|
|
# got the new document back, which is the same plus document_id
|
|
finally:
|
|
webserv.remove_credentials()
|
|
|
|
# now check back if document is there
|
|
doc_id = new_doc[Document.DOCUMENT_DOCUMENT_ID]
|
|
remote_doc = webserv.get_documents_by_id([doc_id],
|
|
cache = False)[doc_id]
|
|
self.assertTrue(new_doc.is_video())
|
|
self.assertTrue(remote_doc.is_video())
|
|
self.assertTrue(not remote_doc.is_comment())
|
|
self.assertTrue(not remote_doc.is_icon())
|
|
self.assertTrue(not remote_doc.is_file())
|
|
self.assertTrue(not remote_doc.is_image())
|
|
self.assertEqual(new_doc.document_type(), remote_doc.document_type())
|
|
self.assertEqual(new_doc.document_id(), remote_doc.document_id())
|
|
self.assertEqual(new_doc.repository_id(), self._repository_id)
|
|
self.assertEqual(new_doc.repository_id(), remote_doc.repository_id())
|
|
self.assertEqual(new_doc.document_keywords(), keywords)
|
|
|
|
self.assertEqual(new_doc[DocumentFactory.DOCUMENT_USERNAME_ID],
|
|
remote_doc[DocumentFactory.DOCUMENT_USERNAME_ID])
|
|
self.assertEqual(new_doc[Document.DOCUMENT_DOCUMENT_ID],
|
|
remote_doc[Document.DOCUMENT_DOCUMENT_ID])
|
|
self.assertEqual(new_doc[Document.DOCUMENT_DESCRIPTION_ID],
|
|
remote_doc[Document.DOCUMENT_DESCRIPTION_ID])
|
|
self.assertEqual(remote_doc[Document.DOCUMENT_DESCRIPTION_ID],
|
|
description)
|
|
self.assertEqual(new_doc[Document.DOCUMENT_TITLE_ID],
|
|
remote_doc[Document.DOCUMENT_TITLE_ID])
|
|
self.assertEqual(remote_doc[Document.DOCUMENT_TITLE_ID], title)
|
|
self.assertEqual(new_doc.document_timestamp(),
|
|
remote_doc.document_timestamp())
|
|
self.assertEqual(new_doc.document_keywords(),
|
|
remote_doc.document_keywords())
|
|
|
|
# now try to remove
|
|
webserv.remove_credentials()
|
|
try:
|
|
webserv.remove_document(remote_doc[Document.DOCUMENT_DOCUMENT_ID])
|
|
# webserv.AuthenticationRequired should be raised
|
|
self.assertTrue(False)
|
|
except webserv.AuthenticationRequired:
|
|
webserv.add_credentials(self._fake_user, self._fake_pass)
|
|
self.assertTrue(webserv.credentials_available())
|
|
self.assertEqual(webserv.get_credentials(), self._fake_user)
|
|
# credentials must be valid
|
|
webserv.validate_credentials()
|
|
# now it should not crash
|
|
self.assertTrue(
|
|
webserv.remove_document(
|
|
remote_doc[Document.DOCUMENT_DOCUMENT_ID]))
|
|
# got the new document back, which is the same plus document_id
|
|
finally:
|
|
webserv.remove_credentials()
|
|
|
|
def test_get_documents(self):
|
|
webserv = self._factory.new(self._repository_id)
|
|
self.assertTrue(webserv.service_available(cache = False))
|
|
pk = self._real_package_name
|
|
return self._test_get_documents(pk, webserv.get_documents)
|
|
|
|
def test_get_documents_comments(self):
|
|
webserv = self._factory.new(self._repository_id)
|
|
self.assertTrue(webserv.service_available(cache = False))
|
|
pk = self._real_package_name
|
|
return self._test_get_documents(pk, webserv.get_comments)
|
|
|
|
def test_get_documents_icons(self):
|
|
webserv = self._factory.new(self._repository_id)
|
|
self.assertTrue(webserv.service_available(cache = False))
|
|
pk = self._real_package_name
|
|
return self._test_get_documents(pk, webserv.get_icons)
|
|
|
|
def _test_get_documents(self, pk, webserv_func):
|
|
docs = webserv_func([pk], cache = False)
|
|
self.assertTrue(pk in docs)
|
|
self.assertTrue(isinstance(docs[pk], DocumentList))
|
|
for vals in docs.values():
|
|
self.assertTrue(isinstance(vals, DocumentList))
|
|
self.assertEqual(vals.package_name(), pk)
|
|
self.assertTrue(isinstance(vals.has_more(), int))
|
|
self.assertEqual(vals.offset(), 0)
|
|
for val in vals:
|
|
self.assertTrue(isinstance(val, Document))
|
|
self.assertEqual(val.repository_id(), self._repository_id)
|
|
# TODO: use constants instead of strings
|
|
self.assertEqual(sorted(val.keys()),
|
|
sorted([DocumentFactory.DOCUMENT_USERNAME_ID,
|
|
Document.DOCUMENT_REPOSITORY_ID,
|
|
Document.DOCUMENT_DESCRIPTION_ID,
|
|
Document.DOCUMENT_TITLE_ID,
|
|
Document.DOCUMENT_URL_ID,
|
|
Document.DOCUMENT_TIMESTAMP_ID,
|
|
Document.DOCUMENT_DOCUMENT_TYPE_ID,
|
|
Document.DOCUMENT_KEYWORDS_ID,
|
|
Document.DOCUMENT_DATA_ID,
|
|
Document.DOCUMENT_DOCUMENT_ID]))
|
|
self.assertTrue(isinstance(
|
|
val[DocumentFactory.DOCUMENT_USERNAME_ID],
|
|
const_get_stringtype()))
|
|
self.assertTrue(isinstance(val[Document.DOCUMENT_REPOSITORY_ID],
|
|
const_get_stringtype()))
|
|
self.assertTrue(isinstance(val[Document.DOCUMENT_TITLE_ID],
|
|
const_get_stringtype()))
|
|
self.assertTrue(isinstance(val[Document.DOCUMENT_DESCRIPTION_ID],
|
|
const_get_stringtype()))
|
|
self.assertTrue(isinstance(val[Document.DOCUMENT_TIMESTAMP_ID],
|
|
float))
|
|
self.assertTrue(isinstance(val[Document.DOCUMENT_DOCUMENT_TYPE_ID],
|
|
int))
|
|
self.assertTrue(isinstance(val[Document.DOCUMENT_DOCUMENT_ID],
|
|
int))
|
|
self.assertTrue(isinstance(val[Document.DOCUMENT_DATA_ID],
|
|
const_get_stringtype()))
|
|
self.assertTrue(isinstance(val[Document.DOCUMENT_KEYWORDS_ID],
|
|
const_get_stringtype()))
|
|
if val[Document.DOCUMENT_URL_ID]:
|
|
self.assertTrue(isinstance(val[Document.DOCUMENT_URL_ID],
|
|
const_get_stringtype()))
|
|
else:
|
|
self.assertTrue(val[Document.DOCUMENT_URL_ID] is None)
|
|
|
|
def test_get_icons(self):
|
|
pk = self._real_package_name
|
|
webserv = self._factory.new(self._repository_id)
|
|
self.assertTrue(webserv.service_available(cache = False))
|
|
docs = webserv.get_icons([pk], cache = False)
|
|
self.assertTrue(pk in docs)
|
|
self.assertTrue(isinstance(docs[pk], DocumentList))
|
|
for vals in docs.values():
|
|
for val in vals:
|
|
self.assertTrue(isinstance(val, Document))
|
|
self.assertTrue(val.is_icon())
|
|
self.assertTrue(not val.is_image())
|
|
|
|
def test_get_comments(self):
|
|
pk = self._real_package_name
|
|
webserv = self._factory.new(self._repository_id)
|
|
docs = webserv.get_comments([pk], cache = False)
|
|
self.assertTrue(pk in docs)
|
|
self.assertTrue(docs[pk])
|
|
for vals in docs.values():
|
|
for val in vals:
|
|
self.assertTrue(isinstance(val, Document))
|
|
self.assertTrue(val.is_comment())
|
|
|
|
def test_report_error(self):
|
|
params = {}
|
|
params['arch'] = etpConst['currentarch']
|
|
params['stacktrace'] = "zomg Vogons!"
|
|
params['name'] = "Ford Prefect"
|
|
params['email'] = "ford@betelgeuse.gal"
|
|
params['version'] = etpConst['entropyversion']
|
|
params['errordata'] = "towel forgotten"
|
|
params['description'] = "don't panic"
|
|
params['arguments'] = ' '.join(sys.argv)
|
|
params['uid'] = etpConst['uid']
|
|
params['system_version'] = "N/A"
|
|
params['system_version'] = "42"
|
|
params['processes'] = "none"
|
|
params['lsof'] = "none"
|
|
params['lspci'] = "none"
|
|
params['dmesg'] = "none"
|
|
params['locale'] = "Vogonish"
|
|
params['repositories.conf'] = "empty"
|
|
params['client.conf'] = "---NA---"
|
|
webserv = self._factory.new(self._repository_id)
|
|
outcome = webserv.report_error(params)
|
|
self.assertEqual(outcome, None)
|
|
|
|
def test_data_send_available(self):
|
|
webserv = self._factory.new(self._repository_id)
|
|
self.assertEqual(webserv.data_send_available(), True)
|
|
|
|
def test_factory_comment(self):
|
|
webserv = self._factory.new(self._repository_id)
|
|
self.assertTrue(webserv.service_available(cache = False))
|
|
doc_factory = webserv.document_factory()
|
|
keywords = "keyword1 keyword2"
|
|
doc = doc_factory.comment("username", "comment", "title", keywords)
|
|
self.assertTrue(doc.is_comment())
|
|
self.assertTrue(not doc.is_image())
|
|
self.assertTrue(not doc.is_icon())
|
|
self.assertTrue(not doc.is_file())
|
|
self.assertTrue(not doc.is_video())
|
|
self.assertEqual(doc.document_id(), None) # it's new!
|
|
self.assertEqual(doc.repository_id(), self._repository_id)
|
|
self.assertEqual(doc.document_type(), Document.COMMENT_TYPE_ID)
|
|
self.assertEqual(doc.document_keywords(), keywords)
|
|
self.assertEqual(doc[DocumentFactory.DOCUMENT_USERNAME_ID], "username")
|
|
self.assertEqual(doc[Document.DOCUMENT_DATA_ID], "comment")
|
|
self.assertEqual(doc[Document.DOCUMENT_TITLE_ID], "title")
|
|
|
|
def test_factory_image(self):
|
|
webserv = self._factory.new(self._repository_id)
|
|
self.assertTrue(webserv.service_available(cache = False))
|
|
doc_factory = webserv.document_factory()
|
|
keywords = "keyword1 keyword2"
|
|
|
|
tmp_fd, tmp_path = tempfile.mkstemp()
|
|
try:
|
|
with open(tmp_path, "ab+") as tmp_f:
|
|
doc = doc_factory.image("username", tmp_f, "title",
|
|
"description", keywords)
|
|
self.assertTrue(not doc.is_comment())
|
|
self.assertTrue(doc.is_image())
|
|
self.assertTrue(not doc.is_icon())
|
|
self.assertTrue(not doc.is_file())
|
|
self.assertTrue(not doc.is_video())
|
|
self.assertEqual(doc.document_id(), None) # it's new!
|
|
self.assertEqual(doc.repository_id(), self._repository_id)
|
|
self.assertEqual(doc.document_type(), Document.IMAGE_TYPE_ID)
|
|
self.assertEqual(doc.document_keywords(), keywords)
|
|
self.assertEqual(doc[DocumentFactory.DOCUMENT_USERNAME_ID],
|
|
"username")
|
|
self.assertEqual(doc[DocumentFactory.DOCUMENT_PAYLOAD_ID],
|
|
(os.path.basename(tmp_f.name), tmp_f))
|
|
self.assertEqual(doc[Document.DOCUMENT_TITLE_ID], "title")
|
|
self.assertEqual(doc[Document.DOCUMENT_DESCRIPTION_ID],
|
|
"description")
|
|
finally:
|
|
os.remove(tmp_path)
|
|
|
|
def test_factory_icon(self):
|
|
webserv = self._factory.new(self._repository_id)
|
|
self.assertTrue(webserv.service_available(cache = False))
|
|
doc_factory = webserv.document_factory()
|
|
keywords = "keyword1 keyword2"
|
|
|
|
tmp_fd, tmp_path = tempfile.mkstemp()
|
|
try:
|
|
with open(tmp_path, "ab+") as tmp_f:
|
|
doc = doc_factory.icon("username", tmp_f, "title",
|
|
"description", keywords)
|
|
self.assertTrue(not doc.is_comment())
|
|
self.assertTrue(not doc.is_image())
|
|
self.assertTrue(doc.is_icon())
|
|
self.assertTrue(not doc.is_file())
|
|
self.assertTrue(not doc.is_video())
|
|
self.assertEqual(doc.document_id(), None) # it's new!
|
|
self.assertEqual(doc.repository_id(), self._repository_id)
|
|
self.assertEqual(doc.document_type(), Document.ICON_TYPE_ID)
|
|
self.assertEqual(doc.document_keywords(), keywords)
|
|
self.assertEqual(doc[DocumentFactory.DOCUMENT_USERNAME_ID],
|
|
"username")
|
|
self.assertEqual(doc[DocumentFactory.DOCUMENT_PAYLOAD_ID],
|
|
(os.path.basename(tmp_f.name), tmp_f))
|
|
self.assertEqual(doc[Document.DOCUMENT_TITLE_ID],
|
|
"title")
|
|
self.assertEqual(doc[Document.DOCUMENT_DESCRIPTION_ID],
|
|
"description")
|
|
finally:
|
|
os.remove(tmp_path)
|
|
|
|
def test_factory_file(self):
|
|
webserv = self._factory.new(self._repository_id)
|
|
self.assertTrue(webserv.service_available(cache = False))
|
|
doc_factory = webserv.document_factory()
|
|
keywords = "keyword1 keyword2"
|
|
|
|
tmp_fd, tmp_path = tempfile.mkstemp()
|
|
try:
|
|
with open(tmp_path, "ab+") as tmp_f:
|
|
doc = doc_factory.file("username", tmp_f, "title",
|
|
"description", keywords)
|
|
self.assertTrue(not doc.is_comment())
|
|
self.assertTrue(not doc.is_image())
|
|
self.assertTrue(not doc.is_icon())
|
|
self.assertTrue(doc.is_file())
|
|
self.assertTrue(not doc.is_video())
|
|
self.assertEqual(doc.document_id(), None) # it's new!
|
|
self.assertEqual(doc.repository_id(), self._repository_id)
|
|
self.assertEqual(doc.document_type(), Document.FILE_TYPE_ID)
|
|
self.assertEqual(doc[DocumentFactory.DOCUMENT_USERNAME_ID],
|
|
"username")
|
|
self.assertEqual(doc[DocumentFactory.DOCUMENT_PAYLOAD_ID],
|
|
(os.path.basename(tmp_f.name), tmp_f))
|
|
self.assertEqual(doc[Document.DOCUMENT_TITLE_ID],
|
|
"title")
|
|
self.assertEqual(doc[Document.DOCUMENT_DESCRIPTION_ID],
|
|
"description")
|
|
self.assertEqual(doc.document_keywords(), keywords)
|
|
finally:
|
|
os.remove(tmp_path)
|
|
|
|
def test_factory_video(self):
|
|
webserv = self._factory.new(self._repository_id)
|
|
self.assertTrue(webserv.service_available(cache = False))
|
|
doc_factory = webserv.document_factory()
|
|
keywords = "keyword1 keyword2"
|
|
|
|
tmp_fd, tmp_path = tempfile.mkstemp()
|
|
try:
|
|
with open(tmp_path, "ab+") as tmp_f:
|
|
doc = doc_factory.video("username", tmp_f, "title",
|
|
"description", keywords)
|
|
self.assertTrue(not doc.is_comment())
|
|
self.assertTrue(not doc.is_image())
|
|
self.assertTrue(not doc.is_icon())
|
|
self.assertTrue(not doc.is_file())
|
|
self.assertTrue(doc.is_video())
|
|
self.assertEqual(doc.document_id(), None) # it's new!
|
|
self.assertEqual(doc.repository_id(), self._repository_id)
|
|
self.assertEqual(doc.document_type(), Document.VIDEO_TYPE_ID)
|
|
self.assertEqual(doc[DocumentFactory.DOCUMENT_USERNAME_ID],
|
|
"username")
|
|
self.assertEqual(doc[DocumentFactory.DOCUMENT_PAYLOAD_ID],
|
|
(os.path.basename(tmp_f.name), tmp_f))
|
|
self.assertEqual(doc[Document.DOCUMENT_TITLE_ID],
|
|
"title")
|
|
self.assertEqual(doc[Document.DOCUMENT_DESCRIPTION_ID],
|
|
"description")
|
|
self.assertEqual(doc.document_keywords(), keywords)
|
|
finally:
|
|
os.remove(tmp_path)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
if "--debug" in sys.argv:
|
|
sys.argv.remove("--debug")
|
|
from entropy.const import etpUi
|
|
etpUi['debug'] = True
|
|
unittest.main()
|
|
entropy.tools.kill_threads()
|
|
raise SystemExit(0)
|