[entropy.services] introduce the new Entropy Web Services Client library.

This commit is contained in:
Fabio Erculiani
2011-03-26 16:21:27 +01:00
parent 7f5cc1972b
commit 884bf31e6d
4 changed files with 2462 additions and 1 deletions

View File

@@ -21,17 +21,20 @@ class LoadersMixin:
def __init__(self):
self._spm_cache = {}
# instantiate here to avoid runtime loading, that can cause failures
# during complete system upgrades
from entropy.client.interfaces.trigger import Trigger
from entropy.client.interfaces.repository import Repository
from entropy.client.interfaces.package import Package
from entropy.client.interfaces.sets import Sets
from entropy.client.misc import FileUpdates
from entropy.client.services.interfaces import ClientWebServiceFactory
self.__package_loader = Package
self.__repository_loader = Repository
self.__trigger_loader = Trigger
self.__sets_loader = Sets
self.__package_files_loader = FileUpdates
self.__webservice_factory = ClientWebServiceFactory
def Sets(self):
"""
@@ -92,6 +95,17 @@ class LoadersMixin:
kwargs['gpg'] = client_data['gpg']
return self.__repository_loader(self, *args, **kwargs)
def WebServices(self):
"""
Load Entropy Web Services Factory interface, that can be used
to obtain a WebService object that is able to communicate with
repository remote services, if available.
@return: WebServicesFactory instance object
@rtype: entropy.client.services.interfaces.WebServicesFactory
"""
return self.__webservice_factory(self)
def Spm(self):
"""
Load Source Package Manager instance object

View File

@@ -0,0 +1,940 @@
# -*- coding: utf-8 -*-
"""
@author: Fabio Erculiani <lxnay@sabayon.org>
@contact: lxnay@sabayon.org
@copyright: Fabio Erculiani
@license: GPL-2
B{Entropy Client Repository Web Services Client interface}.
"""
__all__ = ["ClientWebServiceFactory", "ClientWebService", "Document",
"DocumentFactory"]
import os
import time
from entropy.const import const_get_stringtype
from entropy.i18n import _
from entropy.services.client import WebServiceFactory, WebService
class Document(dict):
"""
User Generated Content Document object. It inherits a dictionary and
contains metadata bound to a single content item (such as, a user comment,
image, etc).
Since the metadata format is "volatile" at the moment, you should use
Document.get() to retrieve metadata objects (this is a hash table) rather
than relying on __getitem__ (using obj[item_key]).
"""
# basic document types supported
UNKNOWN_TYPE_ID = -1
COMMENT_TYPE_ID = 1
# BBCODE_TYPE_ID = 2
IMAGE_TYPE_ID = 3
FILE_TYPE_ID = 4
VIDEO_TYPE_ID = 5
# This is a new ID, new documents will have this
ICON_TYPE_ID = 6
DESCRIPTION_PLURAL = {
UNKNOWN_TYPE_ID: _("Unknown documents"),
COMMENT_TYPE_ID: _('Comments'),
# BBCODE_TYPE_ID: _('BBcode Documents'),
IMAGE_TYPE_ID: _('Images/Screenshots'),
FILE_TYPE_ID: _('Generic Files'),
VIDEO_TYPE_ID: _('Videos'),
ICON_TYPE_ID: _('Icons'),
}
DESCRIPTION_SINGULAR = {
UNKNOWN_TYPE_ID: _("Unknown document"),
COMMENT_TYPE_ID: _('Comment'),
# BBCODE_TYPE_ID: _('BBcode Document'),
IMAGE_TYPE_ID: _('Image/Screenshot'),
FILE_TYPE_ID: _('Generic File'),
VIDEO_TYPE_ID: _('Video'),
ICON_TYPE_ID: _('Icon'),
}
# backward compatibility
PACKAGE_ICON_TITLE_ID = "__icon__"
# Document hash table key pointing to document repository id
# see Document.repository_id()
DOCUMENT_REPOSITORY_ID = "repository_id"
# Document hash table key pointing to document id
# see Document.document_id()
DOCUMENT_DOCUMENT_ID = "document_id"
# Document hash table key pointing to document type id
# see Document.document_type_id()
DOCUMENT_DOCUMENT_TYPE_ID = "document_type_id"
# Document hash table key pointing to document data
# see Document.document_data()
DOCUMENT_DATA_ID = "ddata"
# Document hash table key pointing to document keywords (tags...)
# see Document.document_keywords()
DOCUMENT_KEYWORDS_ID = "keywords"
# Document hash table key pointing to document timestamp (float)
DOCUMENT_TIMESTAMP_ID = "ts"
def __init__(self, repository_id, document_id, document_type_id):
"""
Document constructor.
@param repository_id: repository identifier
@type repository_id: string
@param document_id: document unique identifier
@type document_id: int
@param document_id: document type identifier
@type document_id: int
"""
self._document_type_revmap = {
1: Document.COMMENT_TYPE_ID,
# 2: Document.BBCODE_TYPE_ID,
3: Document.IMAGE_TYPE_ID,
4: Document.FILE_TYPE_ID,
5: Document.VIDEO_TYPE_ID,
6: Document.ICON_TYPE_ID,
}
obj = {
Document.DOCUMENT_REPOSITORY_ID: repository_id,
Document.DOCUMENT_DOCUMENT_ID: document_id,
Document.DOCUMENT_DOCUMENT_TYPE_ID: document_type_id,
}
self.update(obj)
self._add_base_metadata()
def _add_base_metadata(self):
"""
Add base metadata to Document object.
"""
if Document.DOCUMENT_TIMESTAMP_ID not in self:
self[Document.DOCUMENT_TIMESTAMP_ID] = time.time()
def repository_id(self):
"""
Return the currently set repository identifier.
@return: the repository identifier
@rtype: string
"""
return self[Document.DOCUMENT_REPOSITORY_ID]
def document_id(self):
"""
Return the currently set document unique identifier.
@return: document unique identifier
@rtype: int
"""
return self[Document.DOCUMENT_DOCUMENT_ID]
def document_type(self):
"""
Return the document type identifier, which is always one of:
Document.UNKNOWN_TYPE_ID, Document.COMMENT_TYPE_ID,
Document.BBCODE_TYPE_ID, Document.IMAGE_TYPE_ID,
Document.FILE_TYPE_ID, Document.VIDEO_TYPE_ID
@return: the document type identifier
@rtype: int
"""
return self._document_type_revmap.get(
self[Document.DOCUMENT_DOCUMENT_TYPE_ID],
Document.UNKNOWN_TYPE_ID)
def document_data(self):
"""
Return encapsulated document data. This is an opaque object, usually
string, that depends on the document type.
If there is no document data, None is returned.
@return: document data
@rtype: object or None
"""
return self.get(Document.DOCUMENT_DATA_ID)
def document_keywords(self):
"""
Return a list of keywords bound to this document.
List is always returned, even if metadatum is not available.
@return: document keywords
@rtype: list
"""
return self.get(Document.DOCUMENT_KEYWORDS_ID)
def document_timestamp(self):
"""
Return the document timestamp. If value is not available (unlikely)
the returned value will be 0.0.
@return: document timestamp
@rtype: float
"""
return self.get(Document.DOCUMENT_TIMESTAMP_ID)
def is_icon(self):
"""
Return whether this Document is an Icon document.
@return: True, if Document is representing an icon
@rtype: bool
"""
if self.document_type() == Document.ICON_TYPE_ID:
return True
# backward compatibility
if not self.is_image():
return False
if self.get(DocumentFactory.DOCUMENT_TITLE_ID) == \
Document.PACKAGE_ICON_TITLE_ID:
return True
return False
def is_image(self):
"""
Return whether this Document is an Image document.
@return: True, if Document is representing an image
@rtype: bool
"""
return self.document_type() == Document.IMAGE_TYPE_ID
def is_comment(self):
"""
Return whether this Document is a Comment document.
@return: True, if Document is representing a Comment
@rtype: bool
"""
return self.document_type() == Document.COMMENT_TYPE_ID
def is_file(self):
"""
Return whether this Document is a File document.
@return: True, if Document is representing a File
@rtype: bool
"""
return self.document_type() == Document.FILE_TYPE_ID
def is_video(self):
"""
Return whether this Document is a Video document.
@return: True, if Document is representing a Video
@rtype: bool
"""
return self.document_type() == Document.VIDEO_TYPE_ID
class DocumentFactory(object):
"""
Class to generate valid, new Document objects.
"""
# Document hash table key for the document title
DOCUMENT_TITLE_ID = "title"
# Document hash table key for the document username
DOCUMENT_USERNAME_ID = "username"
# Document hash table key for the document description
DOCUMENT_DESCRIPTION_ID = "description"
# Payload metadatum is only available on temporary, to-be-uploaded
# Document objects. Can contain a file object or any other pointer
# to a serializable resource.
DOCUMENT_PAYLOAD_ID = "payload"
# Maximum string length, used for input validation
MAX_STRING_LENGTH = 4000
def __init__(self, repository_id):
"""
DocumentFactory constructor.
@param repository_id: repository identifier
@type repository_id: string
"""
self._repository_id = repository_id
def _validate_strings(self, *strings):
"""
Validate input strings.
@raise AssertionError: if one of the input objects is invalid
"""
for string in strings:
if not isinstance(string, const_get_stringtype()):
raise AssertionError("invalid string type detected")
if len(string) > DocumentFactory.MAX_STRING_LENGTH:
raise AssertionError("string is too long")
def _validate_string_list(self, string_list):
"""
Validate input string list.
@raise AttributeError: if one of the input objects is invalid
"""
self._validate_strings(*string_list)
def _validate_file_object(self, f_obj):
"""
Validate input file object.
"""
if not isinstance(f_obj, file):
raise AssertionError("not a file object")
if f_obj.tell() != 0:
raise AssertionError("file position != 0")
if f_obj.closed:
raise AssertionError("file object is closed")
if "w" in f_obj.mode:
raise AssertionError("file object wrong file mode")
def comment(self, username, comment, title, keywords):
"""
Generate a new Comment Document.
@param username: username of the owner of the Document
@type username: string
@param comment: comment text
@type comment: string
@param title: comment title
@type title: string
@param keywords: list of keywords (string)
@type keywords: list
@return: a new Document object
@rtype: Document
"""
self._validate_strings(username, comment, title)
doc = Document(self._repository_id, None, Document.COMMENT_TYPE_ID)
doc[DocumentFactory.DOCUMENT_USERNAME_ID] = username
doc[Document.DOCUMENT_DATA_ID] = comment
doc[DocumentFactory.DOCUMENT_TITLE_ID] = title
doc[Document.DOCUMENT_KEYWORDS_ID] = keywords
return doc
def image(self, username, file_object, title, description, keywords):
"""
Generate a new Image Document.
@param username: username of the owner of the Document
@type username: string
@param file_object: file object pointing to the image file data. Note
that this resource must be closed by the caller once the object
lifecycle is over. Not doing so will cause the application running
out of resources, leading to crashes. To retrieve the filename,
the "name" attribute is read, this won't work for fdopened files.
@type file_object: string
@param title: comment title
@type title: string
@param keywords: list of keywords (string)
@type keywords: list
@return: a new Document object
@rtype: Document
"""
self._validate_strings(username, title, description)
self._validate_string_list(keywords)
self._validate_file_object(file_object)
doc = Document(self._repository_id, None, Document.IMAGE_TYPE_ID)
doc[DocumentFactory.DOCUMENT_USERNAME_ID] = username
doc[DocumentFactory.DOCUMENT_PAYLOAD_ID] = \
(os.path.basename(file_object.name), file_object)
doc[DocumentFactory.DOCUMENT_TITLE_ID] = title
doc[DocumentFactory.DOCUMENT_DESCRIPTION_ID] = description
doc[Document.DOCUMENT_KEYWORDS_ID] = keywords
return doc
def icon(self, username, file_object, title, description, keywords):
"""
Generate a new Icon Document.
@param username: username of the owner of the Document
@type username: string
@param file_object: file object pointing to the image file data. Note
that this resource must be closed by the caller once the object
lifecycle is over. Not doing so will cause the application running
out of resources, leading to crashes. To retrieve the filename,
the "name" attribute is read, this won't work for fdopened files.
@type file_object: string
@param title: comment title
@type title: string
@param keywords: list of keywords (string)
@type keywords: list
@return: a new Document object
@rtype: Document
"""
self._validate_strings(username, title, description)
self._validate_string_list(keywords)
self._validate_file_object(file_object)
doc = Document(self._repository_id, None, Document.ICON_TYPE_ID)
doc[DocumentFactory.DOCUMENT_USERNAME_ID] = username
doc[DocumentFactory.DOCUMENT_PAYLOAD_ID] = \
(os.path.basename(file_object.name), file_object)
doc[DocumentFactory.DOCUMENT_TITLE_ID] = title
doc[DocumentFactory.DOCUMENT_DESCRIPTION_ID] = description
doc[Document.DOCUMENT_KEYWORDS_ID] = keywords
return doc
def video(self, username, file_object, title, description, keywords):
"""
Generate a new Icon Document.
@param username: username of the owner of the Document
@type username: string
@param file_object: file object pointing to the image file data. Note
that this resource must be closed by the caller once the object
lifecycle is over. Not doing so will cause the application running
out of resources, leading to crashes. To retrieve the filename,
the "name" attribute is read, this won't work for fdopened files.
@type file_object: string
@param title: comment title
@type title: string
@param keywords: list of keywords (string)
@type keywords: list
@return: a new Document object
@rtype: Document
"""
self._validate_strings(username, title, description)
self._validate_string_list(keywords)
self._validate_file_object(file_object)
doc = Document(self._repository_id, None, Document.VIDEO_TYPE_ID)
doc[DocumentFactory.DOCUMENT_USERNAME_ID] = username
doc[DocumentFactory.DOCUMENT_PAYLOAD_ID] = \
(os.path.basename(file_object.name), file_object)
doc[DocumentFactory.DOCUMENT_TITLE_ID] = title
doc[DocumentFactory.DOCUMENT_DESCRIPTION_ID] = description
doc[Document.DOCUMENT_KEYWORDS_ID] = keywords
return doc
def file(self, username, file_object, title, description, keywords):
"""
Generate a new File Document.
@param username: username of the owner of the Document
@type username: string
@param file_object: file object pointing to the image file data. Note
that this resource must be closed by the caller once the object
lifecycle is over. Not doing so will cause the application running
out of resources, leading to crashes. To retrieve the filename,
the "name" attribute is read, this won't work for fdopened files.
@type file_object: string
@param title: comment title
@type title: string
@param keywords: list of keywords (string)
@type keywords: list
@return: a new Document object
@rtype: Document
"""
self._validate_strings(username, title, description)
self._validate_string_list(keywords)
self._validate_file_object(file_object)
doc = Document(self._repository_id, None, Document.FILE_TYPE_ID)
doc[DocumentFactory.DOCUMENT_USERNAME_ID] = username
doc[DocumentFactory.DOCUMENT_PAYLOAD_ID] = \
(os.path.basename(file_object.name), file_object)
doc[DocumentFactory.DOCUMENT_TITLE_ID] = title
doc[DocumentFactory.DOCUMENT_DESCRIPTION_ID] = description
doc[Document.DOCUMENT_KEYWORDS_ID] = keywords
return doc
class ClientWebServiceFactory(WebServiceFactory):
"""
Main Entropy Client Repository Web Service Factory. Generates
WebService objects that can be used to communicate with the established
web service.
This class should be instantiated by calling
"""
def __init__(self, entropy_client):
"""
Overridden constructor.
"""
WebServiceFactory.__init__(self, ClientWebService, entropy_client)
class ClientWebService(WebService):
# Package maximum and minimum vote boundaries
MAX_VOTE = 5.0
MIN_VOTE = 0.0
class DocumentError(WebService.WebServiceException):
"""
Generic Document error object. Raised when Document object is
invalid.
"""
def document_factory(self):
"""
Return a new DocumentFactory object, used to create new Document
objects.
@return: a DocumentFactory object
@rtype: DocumentFactory
"""
return DocumentFactory(self._repository_id)
def get_votes(self, package_names, cache = True):
"""
For given package names, return the current vote. For missing votes
or invalid package_name, None is assigned.
@param package_names: list of package names, either atoms or keys
@type package_names: list
@keyword cache: True means use on-disk cache if available?
@type cache: bool
@return: mapping composed by package name as key and value as vote
(float)
@rtype: dict
@raise WebService.UnsupportedParameters: if input parameters are
invalid
@raise WebService.RequestError: if request cannot be satisfied
@raise WebService.MethodNotAvailable: if API method is not
available remotely and an error occured (error code passed as
exception argument)
@raise WebService.AuthenticationRequired: if require_credentials
is True and credentials are required.
@raise WebService.AuthenticationFailed: if credentials are
not valid
@raise WebService.MalformedResponse: if JSON response cannot be
converted back to dict.
@raise WebService.UnsupportedAPILevel: if client API and Web
Service API do not match
@raise WebService.MethodResponseError; if method execution failed
"""
params = {
"package_names": " ".join(package_names)
}
return self._method_getter("get_votes", params, cache = cache,
require_credentials = False)
def get_downloads(self, package_names, cache = True):
"""
For given package names, return the current download counter.
Packages having no download info will get None instead of int.
@param package_names: list of package names, either atoms or keys
@type package_names: list
@keyword cache: True means use on-disk cache if available?
@type cache: bool
@return: mapping composed by package name as key and downloads as value
@rtype: dict
@raise WebService.UnsupportedParameters: if input parameters are
invalid
@raise WebService.RequestError: if request cannot be satisfied
@raise WebService.MethodNotAvailable: if API method is not
available remotely and an error occured (error code passed as
exception argument)
@raise WebService.AuthenticationRequired: if require_credentials
is True and credentials are required.
@raise WebService.AuthenticationFailed: if credentials are
not valid
@raise WebService.MalformedResponse: if JSON response cannot be
converted back to dict.
@raise WebService.UnsupportedAPILevel: if client API and Web
Service API do not match
@raise WebService.MethodResponseError; if method execution failed
"""
params = {
"package_names": " ".join(package_names)
}
return self._method_getter("get_downloads", params, cache = cache,
require_credentials = False)
def add_vote(self, package_name, vote):
"""
For given package name, add a vote.
@param package_name: package name, either atom or key
@type package_name: string
@return: True, if vote was recorded, False otherwise
@rtype: bool
@raise WebService.UnsupportedParameters: if input parameters are
invalid
@raise WebService.RequestError: if request cannot be satisfied
@raise WebService.MethodNotAvailable: if API method is not
available remotely and an error occured (error code passed as
exception argument)
@raise WebService.AuthenticationRequired: if require_credentials
is True and credentials are required.
@raise WebService.AuthenticationFailed: if credentials are
not valid
@raise WebService.MalformedResponse: if JSON response cannot be
converted back to dict.
@raise WebService.UnsupportedAPILevel: if client API and Web
Service API do not match
@raise WebService.MethodResponseError; if method execution failed
@raise WebService.RequestError: if vote is invalid.
@raise WebService.AuthenticationRequired: if login information are
not available (user interface should raise a login form, validate
the credentials and retry the function call here)
"""
valid = True
if vote > ClientWebService.MAX_VOTE:
valid = False
elif vote < ClientWebService.MIN_VOTE:
valid = False
if not valid:
raise WebService.RequestError("invalid vote")
params = {
"package_name": package_name,
"vote": vote,
}
valid = self._method_getter("add_vote", params, cache = False,
require_credentials = True)
if valid:
# NOTE: we can accept to be non-atomic in this case.
# TODO: cannot remove all the vote cache when just one element gets
# tained
self._drop_cached("get_votes")
return valid
def add_downloads(self, package_names):
"""
Notify that a list of packages have been downloaded successfully.
@param package_names: list of package names, either atoms or keys
@type package_names: list
@return: True, if download information was recorded, False otherwise
@rtype: bool
@raise WebService.UnsupportedParameters: if input parameters are
invalid
@raise WebService.RequestError: if request cannot be satisfied
@raise WebService.MethodNotAvailable: if API method is not
available remotely and an error occured (error code passed as
exception argument)
@raise WebService.AuthenticationRequired: if require_credentials
is True and credentials are required.
@raise WebService.AuthenticationFailed: if credentials are
not valid
@raise WebService.MalformedResponse: if JSON response cannot be
converted back to dict.
@raise WebService.UnsupportedAPILevel: if client API and Web
Service API do not match
@raise WebService.MethodResponseError; if method execution failed
@raise WebService.AuthenticationRequired: if login information are
not available (user interface should raise a login form, validate
the credentials and retry the function call here)
"""
params = {
"package_names": " ".join(package_names),
}
valid = self._method_getter("add_downloads", params, cache = False,
require_credentials = True)
if valid:
# NOTE: we can accept to be non-atomic in this case.
# TODO: cannot remove all the vote cache when just one element gets
# tained
self._drop_cached("get_downloads")
return valid
def get_icons(self, package_names, cache = True):
"""
For given package names, return the current Document icon object list.
Packages having no icon will get empty list as value.
@param package_names: list of names of the packages to query,
either atom or key
@type package_names: list
@keyword cache: True means use on-disk cache if available?
@type cache: bool
@return: mapping composed by package name as key and Document list
as value
@rtype: dict
@raise WebService.UnsupportedParameters: if input parameters are
invalid
@raise WebService.RequestError: if request cannot be satisfied
@raise WebService.MethodNotAvailable: if API method is not
available remotely and an error occured (error code passed as
exception argument)
@raise WebService.AuthenticationRequired: if require_credentials
is True and credentials are required.
@raise WebService.AuthenticationFailed: if credentials are
not valid
@raise WebService.MalformedResponse: if JSON response cannot be
converted back to dict.
@raise WebService.UnsupportedAPILevel: if client API and Web
Service API do not match
@raise WebService.MethodResponseError; if method execution failed
"""
document_type_filter = [Document.IMAGE_TYPE_ID]
data = self.get_documents(package_names,
document_type_filter = document_type_filter, cache = cache)
icons_data = {}
for key in list(data.keys()):
icons_data[key] = [x for x in data.get(key, []) if x.is_icon()]
return icons_data
def get_comments(self, package_names, cache = True):
"""
For given package names, return the current Document Comment object
list.
Packages having no comments will get empty list as value.
@param package_names: list of names of the packages to query,
either atom or key
@type package_names: list
@keyword cache: True means use on-disk cache if available?
@type cache: bool
@return: mapping composed by package name as key and Document list
as value
@rtype: dict
@raise WebService.UnsupportedParameters: if input parameters are
invalid
@raise WebService.RequestError: if request cannot be satisfied
@raise WebService.MethodNotAvailable: if API method is not
available remotely and an error occured (error code passed as
exception argument)
@raise WebService.AuthenticationRequired: if require_credentials
is True and credentials are required.
@raise WebService.AuthenticationFailed: if credentials are
not valid
@raise WebService.MalformedResponse: if JSON response cannot be
converted back to dict.
@raise WebService.UnsupportedAPILevel: if client API and Web
Service API do not match
@raise WebService.MethodResponseError; if method execution failed
"""
document_type_filter = [Document.COMMENT_TYPE_ID]
data = self.get_documents(package_names,
document_type_filter = document_type_filter, cache = cache)
icons_data = {}
for key in list(data.keys()):
icons_data[key] = [x for x in data.get(key, []) if x.is_icon()]
return icons_data
def get_documents(self, package_names, document_type_filter = None,
cache = True):
"""
For given package names, return the current Document object list.
Packages having no documents will get empty list as value.
@param package_names: list of names of the packages to query,
either atom or key
@type package_names: list
@keyword document_type_filter: list of document type identifiers (
see Document class) that are required.
@type document_type_filter: list
@keyword cache: True means use on-disk cache if available?
@type cache: bool
@return: mapping composed by package name as key and Document list as
value
@rtype: dict
@raise WebService.UnsupportedParameters: if input parameters are
invalid
@raise WebService.RequestError: if request cannot be satisfied
@raise WebService.MethodNotAvailable: if API method is not
available remotely and an error occured (error code passed as
exception argument)
@raise WebService.AuthenticationRequired: if require_credentials
is True and credentials are required.
@raise WebService.AuthenticationFailed: if credentials are
not valid
@raise WebService.MalformedResponse: if JSON response cannot be
converted back to dict.
@raise WebService.UnsupportedAPILevel: if client API and Web
Service API do not match
@raise WebService.MethodResponseError; if method execution failed
"""
if document_type_filter is None:
document_type_filter = []
params = {
"package_names": " ".join(package_names),
"filter": " ".join([str(x) for x in document_type_filter]),
}
objs = self._method_getter("get_documents", params, cache = cache,
require_credentials = False)
data = {}
for package_name in package_names:
obj = objs.get(package_name)
if obj is not None:
d_obj = Document(self._repository_id, obj['document_id'],
obj['document_type_id'])
d_obj.update(obj)
obj = d_obj
data[package_name] = obj
return data
def get_documents_by_id(self, document_ids, cache = True):
"""
For given Document object identifiers, return the respective Document
object.
Unavailable Document object identifiers will have None as dict value.
@param document_ids: list of document identifiers (int)
either atom or key
@type document_ids: list
@keyword cache: True means use on-disk cache if available?
@type cache: bool
@return: mapping composed by Document identifier as key and
Document as value
@rtype: dict
@raise WebService.UnsupportedParameters: if input parameters are
invalid
@raise WebService.RequestError: if request cannot be satisfied
@raise WebService.MethodNotAvailable: if API method is not
available remotely and an error occured (error code passed as
exception argument)
@raise WebService.AuthenticationRequired: if require_credentials
is True and credentials are required.
@raise WebService.AuthenticationFailed: if credentials are
not valid
@raise WebService.MalformedResponse: if JSON response cannot be
converted back to dict.
@raise WebService.UnsupportedAPILevel: if client API and Web
Service API do not match
@raise WebService.MethodResponseError; if method execution failed
"""
params = {
"document_ids": " ".join([str(x) for x in document_ids]),
}
objs = self._method_getter("get_documents_by_id", params, cache = cache,
require_credentials = False)
data = {}
for document_id in document_ids:
obj = objs.get(document_id)
if obj is not None:
d_obj = Document(self._repository_id, obj['document_id'],
obj['document_type_id'])
d_obj.update(obj)
obj = d_obj
data[package_name] = obj
return data
def _drop_document_cache(self):
"""
Drop all on-disk cache items related to document cache.
"""
self._drop_cached("get_documents")
self._drop_cached("get_comments")
self._drop_cached("get_icons")
def add_document(self, document):
"""
Send a new Document object to the service.
This method will return the newly created remote document object, or
raise exceptions in case the operation failed.
@param package_names: list of package names, either atoms or keys
@type package_names: list
@return: the newly created remote Document object
@rtype: Document
@raise WebService.UnsupportedParameters: if input parameters are
invalid
@raise WebService.RequestError: if request cannot be satisfied
@raise WebService.MethodNotAvailable: if API method is not
available remotely and an error occured (error code passed as
exception argument)
@raise WebService.AuthenticationRequired: if require_credentials
is True and credentials are required.
@raise WebService.AuthenticationFailed: if credentials are
not valid
@raise WebService.MalformedResponse: if JSON response cannot be
converted back to dict.
@raise WebService.UnsupportedAPILevel: if client API and Web
Service API do not match
@raise WebService.MethodResponseError; if method execution failed
@raise WebService.AuthenticationRequired: if login information are
not available (user interface should raise a login form, validate
the credentials and retry the function call here)
@raise ClientWebService.DocumentError: if document submitted is
invalid (contains invalid fields)
"""
if not isinstance(document, Document):
raise AttributeError("only accepting Document objects")
if document['document_id'] is not None:
raise WebService.UnsupportedParameters("document is not new")
# This returns None if document is not accepted
remote_document = self._method_getter("add_document", document,
cache = False, require_credentials = True)
if remote_document is None:
raise ClientWebService.DocumentError("Document not accepted")
# NOTE: we can accept to be non-atomic in this case.
self._drop_document_cache()
return remote_document
def remove_document(self, document_id):
"""
Remove a Document (through its id) from the service repository.
@param document_id: Entropy Document identifier
@type document_id: int
@return: True if document has been removed, False if Document doesn't
exist
@rtype: bool
@raise WebService.UnsupportedParameters: if input parameters are
invalid
@raise WebService.RequestError: if request cannot be satisfied
@raise WebService.MethodNotAvailable: if API method is not
available remotely and an error occured (error code passed as
exception argument)
@raise WebService.AuthenticationRequired: if require_credentials
is True and credentials are required.
@raise WebService.AuthenticationFailed: if credentials are
not valid
@raise WebService.MalformedResponse: if JSON response cannot be
converted back to dict.
@raise WebService.UnsupportedAPILevel: if client API and Web
Service API do not match
@raise WebService.MethodResponseError; if method execution failed
@raise WebService.AuthenticationRequired: if login information are
not available (user interface should raise a login form, validate
the credentials and retry the function call here)
"""
params = {
Document.DOCUMENT_DOCUMENT_ID: document_id,
}
result = self._method_getter("remove_document", params,
cache = False, require_credentials = True)
# NOTE: we can accept to be non-atomic in this case.
self._drop_document_cache()
return result
def report_error(self, error_params):
"""
Entropy Client Error reporting method. This is mainly used by
Entropy internal code in order to submit a stacktrace and other data
to Entropy developers. "error_params" consinsts in a dictionary
containing error data in string format as values.
@param error_params: dictionary containing error report data
@type error_params: dict
@raise WebService.UnsupportedParameters: if input parameters are
invalid
@raise WebService.RequestError: if request cannot be satisfied
@raise WebService.MethodNotAvailable: if API method is not
available remotely and an error occured (error code passed as
exception argument)
@raise WebService.AuthenticationRequired: if require_credentials
is True and credentials are required.
@raise WebService.AuthenticationFailed: if credentials are
not valid
@raise WebService.MalformedResponse: if JSON response cannot be
converted back to dict.
@raise WebService.UnsupportedAPILevel: if client API and Web
Service API do not match
@raise WebService.MethodResponseError; if method execution failed
"""
self._method_getter("report_error", error_params,
require_credentials = False, cache = False)

View File

@@ -0,0 +1,890 @@
# -*- coding: utf-8 -*-
"""
@author: Fabio Erculiani <lxnay@sabayon.org>
@contact: lxnay@sabayon.org
@copyright: Fabio Erculiani
@license: GPL-2
B{Entropy Base Repository Web Services client interface}.
"""
__all__ = ["WebServiceFactory", "WebService"]
import sys
import os
import errno
import json
import tempfile
import threading
import hashlib
import urllib
import socket
if sys.hexversion >= 0x3000000:
import http.client as httplib
from io import StringIO
else:
import httplib
from cStringIO import StringIO
import entropy.dump
from entropy.core import Singleton
from entropy.cache import EntropyCacher
from entropy.const import const_debug_write, const_setup_file, etpConst, \
const_convert_to_rawstring, const_isunicode
from entropy.exceptions import EntropyException
import entropy.tools
import entropy.dep
class WebServiceFactory(object):
"""
Base Entropy Repository Web Services Factory. Generates
WebService objects that can be used to communicate with the established
web service.
This is a base class and subclasses should be preferred (example:
entropy.client.services.interfaces.ClientWebServiceFactory)
"""
class InvalidWebServiceFactory(EntropyException):
"""
Raised when an invalid WebService based class is passed.
"""
def __init__(self, web_service_class, entropy_client):
"""
WebServiceFactory constructor.
@param entropy_client: Entropy Client interface
@type entropy_client: entropy.client.interfaces.client.Client
"""
object.__init__(self)
if not issubclass(web_service_class, WebService):
raise InvalidWebServiceFactory("invalid web_service_class")
self._entropy = entropy_client
self._service_class = web_service_class
def new(self, repository_id):
"""
Return a new WebService object for given repository identifier.
@param repository_id: repository identifier
@rtype repository_id: string
@raise WebService.UnsupportedService: if web service is
explicitly unsupported by repository
"""
return self._service_class(self._entropy, repository_id)
class WebService(object):
"""
This is the Entropy Repository Web Services that proxies requests over
an Web Services answering to HTTP POST requests
(either over HTTP or over HTTPS) in the following form:
Given that repositories must ship with a file (in their repository
meta file "packages.db.meta", coming from the server-side repository
directory) called "packages.db.webservices"
(etpConst['etpdatabasewebservicesfile']) containing the HTTP POST base
URL (example: http://packages.sabayon.org/api).
The POST url is composed as follows:
<base_url>/<method name>
Function arguments are then appended in JSON format, so that float, int,
strings and lists are correctly represented.
So, for example, if WebService exposes a method with the following
signature (with the base URL in example above):
float get_vote(string package_name)
The URL referenced will be:
http://packages.sabayon.org/api/get_vote
And the JSON dictionary will contain a key called "package_name" with
package_name value.
For methods requiring authentication, the JSON object will contain
"username" and "password" fields (clear text, so make sure to use HTTPS).
The Response depends on each specific method and it is given in JSON format
too, that is afterwards interpreted by the caller function, that will
always return the expected data format (see respective API documentation).
For more information about how to implement the Web Service, please see
the packages.git Sabayon repository, which contains a Pylons MVC web app.
In general, every JSON response must provide a 'code' field, representing
an HTTP-response alike return code (200 is ok, 500 is server error, 400 is
bad request, etc) and a 'message' field, containing the error message (if
no error, 'message' is usually empty). The RPC result is put inside the
'r' field.
This is a base class, and you should really implement a subclass providing
your own API methods, and use _method_getter().
"""
# Supported communcation protocols
SUPPORTED_URL_SCHEMAS = ("http", "https")
# package icon metadata identifier
PKG_ICON_IDENTIFIER = "__icon__"
# Currently supported Web Service API level
# an API level defines a set of available remote calls and their data
# structure
SUPPORTED_API_LEVEL = 1
# Default common Web Service responses
WEB_SERVICE_RESPONSE_CODE_OK = 200
WEB_SERVICE_INVALID_CREDENTIALS_CODE = 450
class WebServiceException(EntropyException):
"""
Base WebService exception class.
"""
class UnsupportedService(WebServiceException):
"""
Raised when Repository doesn't seem to support any Web Service
feature.
"""
class UnsupportedParameters(WebServiceException):
"""
Raised when input parameters cannot be converted to JSON.
Probably due to invalid input data.
"""
class RequestError(WebServiceException):
"""
If the request cannot be satisfied by the remote web service.
"""
class AuthenticationRequired(WebServiceException):
"""
When a method requiring valid user credentials is called without
being logged in.
"""
class AuthenticationFailed(WebServiceException):
"""
When credentials are stored locally but don't seem to work against
the Web Service.
"""
class MethodNotAvailable(WebServiceException):
"""
When calling a remote method that is not available.
"""
class MalformedResponse(WebServiceException):
"""
If JSON response cannot be converted back to dict.
"""
class UnsupportedAPILevel(WebServiceException):
"""
If this client and the Web Service expose a different API level.
"""
class MethodResponseError(WebServiceException):
"""
If the request has been accepted, but its computation stopped for
some reason. The encapsulated data contains the error code.
"""
def __init__(self, entropy_client, repository_id):
"""
WebService constructor.
@param entropy_client: Entropy Client interface
@type entropy_client: entropy.client.interfaces.client.Client
@param repository_id: repository identifier
@rtype repository_id: string
"""
self._cache_dir_lock = threading.RLock()
self._transfer_callback = None
self._entropy = entropy_client
self._repository_id = repository_id
self.__auth_storage = None
self.__settings = None
self.__cacher = None
self._default_timeout_secs = 5.0
self.__credentials_validated = False
# check availability
_repository_data = self._settings['repositories']['available'].get(
self._repository_id)
if _repository_data is None:
raise WebService.UnsupportedService("unsupported")
self._repository_data = _repository_data
web_services_conf = self._repository_data['webservices_config']
# mainly for debugging purposes, hidden and undocumented
override_request_url = os.getenv("ETP_OVERRIDE_REQUEST_URL")
if override_request_url is not None:
_remote_url = override_request_url
else:
try:
with open(web_services_conf, "r") as web_f:
# currently, in this file there is only the remote base URL
_remote_url = web_f.readline().strip()
except (OSError, IOError) as err:
const_debug_write(__name__, "WebService.__init__ error: %s" % (
err,))
raise WebService.UnsupportedService(err)
url_obj = entropy.tools.spliturl(_remote_url)
if url_obj.scheme not in WebService.SUPPORTED_URL_SCHEMAS:
raise WebService.UnsupportedService("unsupported url")
self._request_url = _remote_url
self._request_protocol = url_obj.scheme
self._request_host = url_obj.netloc
self._request_path = url_obj.path
const_debug_write(__name__, "WebService loaded, url: %s" % (
self._request_url,))
def _set_transfer_callback(self, callback):
"""
Set a transfer progress callback function.
@param transfer_callback: this callback function can be used to
show a progress status to user, if passed, it must be a function
accepting 3 input parameters: (int transfered, int total,
bool download). The last parameter is True, when progress is about
download, False if upload. If no transfer information is declared,
total might be -1.
@param transfer_callback: callable
"""
self._transfer_callback = callback
@property
def _settings(self):
"""
Get SystemSettings instance
"""
if self.__settings is None:
self.__settings = self._entropy.Settings()
return self.__settings
@property
def _arch(self):
"""
Get currently running Entropy architecture
"""
return self._settings['repositories']['arch']
@property
def _product(self):
"""
Get currently running Entropy product
"""
return self._settings['repositories']['product']
@property
def _branch(self):
"""
Get currently running Entropy branch
"""
return self._settings['repositories']['branch']
@property
def _authstore(self):
"""
Repository authentication configuration storage interface.
Makes possible to retrieve on-disk stored user credentials.
"""
if self.__auth_storage is None:
self.__auth_storage = AuthenticationStorage()
return self.__auth_storage
@property
def _cacher(self):
if self.__cacher is None:
self.__cacher = EntropyCacher()
return self.__cacher
def _generate_user_agent(self, function_name):
"""
Generate a standard (entropy services centric) HTTP User Agent.
"""
uname = os.uname()
user_agent = "Entropy.Services/%s (compatible; %s; %s: %s %s %s)" % (
etpConst['entropyversion'],
"Entropy",
function_name,
uname[0],
uname[4],
uname[2],
)
return user_agent
def _encode_multipart_form(self, params, file_params, boundary):
"""
Encode parameters and files into a valid HTTP multipart form data.
NOTE: this method loads the whole file in RAM, HTTP post doesn't work
well for big files anyway.
"""
tmp_fd, tmp_path = tempfile.mkstemp()
tmp_f = os.fdopen(tmp_fd, "ab+")
tmp_f.truncate(0)
crlf = '\r\n'
for key, value in params.items():
tmp_f.write("--" + boundary + crlf)
tmp_f.write("Content-Disposition: form-data; name=\"%s\"" % (
key,))
tmp_f.write(crlf + crlf + value + crlf)
for key, (f_name, f_obj) in file_params.items():
tmp_f.write("--" + boundary + crlf)
tmp_f.write(
"Content-Disposition: form-data; name=\"%s\"; filename=\"%s\"" % (
key, f_name,))
tmp_f.write(crlf)
tmp_f.write("Content-Type: application/octet-stream" + crlf)
tmp_f.write("Content-Transfer-Encoding: binary" + crlf + crlf)
f_obj.seek(0)
while True:
chunk = f_obj.read(65536)
if not chunk:
break
tmp_f.write(chunk)
tmp_f.write(crlf)
tmp_f.write("--" + boundary + "--" + crlf + crlf)
tmp_f.flush()
return tmp_f, tmp_path
def _generic_post_handler(self, function_name, params, file_params):
"""
Given a function name and the request data (dict format), do the actual
HTTP request and return the response object to caller.
WARNING: params and file_params dict keys must be ASCII string only.
@param function_name: name of the function that called this method
@type function_name: string
@param params: POST parameters
@type params: dict
@param file_params: mapping composed by file names as key and tuple
composed by (file_name, file object) as values
@type file_params: dict
@return: tuple composed by the server response string or None
(in case of empty response) and the HTTPResponse object (useful
for checking response status)
@rtype: tuple
"""
multipart_boundary = "---entropy.services,boundary---"
request_path = self._request_path.rstrip("/") + "/" + function_name
const_debug_write(__name__,
"WebService _generic_post_handler, calling: %s at %s -- %s,"
" tx_callback: %s" % (self._request_host, request_path,
params, self._transfer_callback,))
connection = None
try:
if self._request_protocol == "http":
connection = httplib.HTTPConnection(self._request_host,
timeout = self._default_timeout_secs)
elif self._request_protocol == "https":
connection = httplib.HTTPSConnection(self._request_host,
timeout = self._default_timeout_secs)
else:
raise WebService.RequestError("invalid request protocol")
headers = {
"Accept": "text/plain",
"User-Agent": self._generate_user_agent(function_name),
}
if file_params is None:
file_params = {}
# autodetect file parameters in params
for k in list(params.keys()):
if isinstance(params[k], (tuple, list)) \
and (len(params[k]) == 2):
f_name, f_obj = params[k]
if isinstance(f_obj, file):
file_params[k] = params[k]
del params[k]
elif const_isunicode(params[k]):
# convert to raw string
params[k] = const_convert_to_rawstring(params[k],
from_enctype = "utf-8")
body = None
if not file_params:
headers["Content-Type"] = "application/x-www-form-urlencoded"
encoded_params = urllib.urlencode(params)
data_size = len(encoded_params)
if self._transfer_callback is not None:
self._transfer_callback(0, data_size, False)
if data_size < 65536:
connection.request("POST", request_path, encoded_params,
headers)
else:
connection.request("POST", request_path, None, headers)
sio = StringIO(encoded_params)
data_size = len(encoded_params)
while True:
chunk = sio.read(65535)
if not chunk:
break
try:
connection.send(chunk)
except socket.error as err:
raise WebService.RequestError(err)
if self._transfer_callback is not None:
self._transfer_callback(sio.tell(),
data_size, False)
# for both ways, send a signal through the callback
if self._transfer_callback is not None:
self._transfer_callback(data_size, data_size, False)
else:
headers["Content-Type"] = "multipart/form-data; boundary=" + \
multipart_boundary
body_file, body_fpath = self._encode_multipart_form(params,
file_params, multipart_boundary)
try:
data_size = body_file.tell()
headers["Content-Length"] = str(data_size)
body_file.seek(0)
if self._transfer_callback is not None:
self._transfer_callback(0, data_size, False)
connection.request("POST", request_path, None, headers)
while True:
chunk = body_file.read(65535)
if not chunk:
break
try:
connection.send(chunk)
except socket.error as err:
raise WebService.RequestError(err)
if self._transfer_callback is not None:
self._transfer_callback(body_file.tell(),
data_size, False)
if self._transfer_callback is not None:
self._transfer_callback(data_size, data_size, False)
finally:
body_file.close()
os.remove(body_fpath)
response = connection.getresponse()
total_length = response.getheader("Content-Length", "-1")
try:
total_length = int(total_length)
except ValueError:
total_length = -1
outcome = ""
current_len = 0
if self._transfer_callback is not None:
self._transfer_callback(current_len, total_length, True)
while True:
chunk = response.read(65536)
if not chunk:
break
outcome += chunk
current_len += len(chunk)
if self._transfer_callback is not None:
self._transfer_callback(current_len, total_length, True)
if self._transfer_callback is not None:
self._transfer_callback(total_length, total_length, True)
if not outcome:
return None, response
return outcome, response
except httplib.HTTPException as err:
raise WebService.RequestError(err)
finally:
if connection is not None:
connection.close()
def _setup_credentials(self, request_params):
"""
This method is automatically called by public API functions to setup
credentials data if available, otherwise user interaction will be
triggered by raising WebService.AuthenticationRequired
"""
creds = self._authstore.get(self._repository_id)
if creds is None:
raise WebService.AuthenticationRequired(self._repository_id)
username, password = creds
request_params['username'], request_params['password'] = \
username, password
def add_credentials(self, username, password):
"""
Add credentials for this repository and store the information into
an user-protected location.
"""
self.__credentials_validated = False
self._authstore.add(self._repository_id, username, password)
def validate_credentials(self):
"""
Validate currently stored credentials (if available) against the
remote service. If credentials are not available,
WebService.AuthenticationRequired is raised.
If credentials are not valid, WebService.AuthenticationFailed is
raised.
@raise WebService.AuthenticationRequired: if credentials are not
available
@raise WebService.AuthenticationFailed: if credentials are not valid
"""
if not self.credentials_available():
raise WebService.AuthenticationRequired("credentials not available")
if not self.__credentials_validated:
# this will raise WebService.AuthenticationFailed if credentials
# are invalid
self._method_getter("validate_credentials", {}, cache = False,
require_credentials = True)
self.__credentials_validated = True
def credentials_available(self):
"""
Return whether credentials are stored locally or not.
Please note that credentials can be stored properly but considered
invalid remotely.
@return: True, if credentials are available
@rtype: bool
"""
return self._authstore.get(self._repository_id) is not None
def get_credentials(self):
"""
Return the username string stored in the authentication storage, if any.
Otherwise return None.
@return: the username string stored in the authentication storage
@rtype: string or None
"""
creds = self._authstore.get(self._repository_id)
if creds is not None:
username, _pass = creds
return username
def remove_credentials(self):
"""
Remove any credential bound to the repository from on-disk storage.
@return: True, if credentials existed and got removed
@rtype: bool
"""
self.__credentials_validated = False
return self._authstore.remove(self._repository_id)
CACHE_DIR = os.path.join(etpConst['entropyworkdir'], "websrv_cache")
def _get_cache_key(self, method, params):
"""
Return on disk cache file name as key, given a method name and its
parameters.
"""
hash_str = repr(params)
if sys.hexversion >= 0x3000000:
hash_str = hash_str.encode("utf-8")
sha = hashlib.sha1()
sha.update(hash_str)
return method + "_" + sha.hexdigest()
def _get_cached(self, cache_key):
"""
Return an on-disk cached object for given cache key.
"""
with self._cache_dir_lock:
return self._cacher.pop(cache_key, cache_dir = WebService.CACHE_DIR)
def _set_cached(self, cache_key, data):
"""
Save a cache item to disk.
"""
with self._cache_dir_lock:
return self._cacher.save(cache_key, data,
cache_dir = WebService.CACHE_DIR)
def _drop_cached(self, method):
"""
Drop all on-disk cache for given method.
"""
with self._cache_dir_lock:
cache_dir = WebService.CACHE_DIR
for currentdir, subdirs, files in os.walk(cache_dir):
hostile_files = [os.path.join(currentdir, x) for x in \
files if x.startswith(method + "_")]
for path in hostile_files:
try:
os.remove(path)
except OSError as err:
# avoid race conditions
if err.errno != errno.ENOENT:
raise
def _method_getter(self, func_name, params, cache = True,
require_credentials = False, file_params = None):
"""
Given a function name and request parameters, do all the duties required
to get a response from the Web Service. This method raises several
exceptions, that have to be advertised on public methods as well.
@param func_name: API function name
@type func_name: string
@param params: dictionary object that will be converted into a JSON
request string
@type params: dict
@keyword cache: True means use on-disk cache if available?
@type cache: bool
@keyword require_credentials: True means that credentials will be added
to the request, if credentials are not available in the local
authentication storage, WebService.AuthenticationRequired is
raised
@type require_credentials: bool
@param file_params: mapping composed by file names as key and tuple
composed by (file_name, file object) as values
@type file_params: dict
@return: the JSON response (dict format)
@rtype: dict
@raise WebService.UnsupportedParameters: if input parameters are invalid
@raise WebService.RequestError: if request cannot be satisfied
@raise WebService.MethodNotAvailable: if API method is not available
remotely and an error occured (error code passed as exception
argument)
@raise WebService.AuthenticationRequired: if require_credentials is True
and credentials are required.
@raise WebService.AuthenticationFailed: if credentials are not valid
@raise WebService.MalformedResponse: if JSON response cannot be
converted back to dict.
@raise WebService.UnsupportedAPILevel: if client API and Web Service
API do not match
@raise WebService.MethodResponseError; if method execution failed
"""
if require_credentials:
# this can raise AuthenticationRequired
self._setup_credentials(params)
cache_key = None
if cache:
cache_key = self._get_cache_key(func_name, params)
obj = self._get_cached(cache_key)
if obj is not None:
const_debug_write(__name__, "WebService.%s(%s) = cached %s" % (
func_name, params, obj,))
return obj
const_debug_write(__name__, "WebService.%s(%s) = NOT cached" % (
func_name, params,))
obj = None
try:
json_response, response = self._generic_post_handler(func_name,
params, file_params)
http_status = response.status
if http_status not in (httplib.OK,):
raise WebService.MethodNotAvailable(http_status)
# try to convert the JSON response
try:
data = json.loads(json_response)
except (ValueError, TypeError) as err:
raise WebService.MalformedResponse(err)
# check API
if data.get("api_rev") != WebService.SUPPORTED_API_LEVEL:
raise WebService.UnsupportedAPILevel(data['api_rev'])
code = data.get("code", -1)
if code == WebService.WEB_SERVICE_INVALID_CREDENTIALS_CODE:
# invalid credentials, ask again login data
raise WebService.AuthenticationFailed(code)
if code != WebService.WEB_SERVICE_RESPONSE_CODE_OK:
raise WebService.MethodResponseError(code)
if "r" not in data:
raise WebService.MalformedResponse("r not found")
obj = data["r"]
return obj
finally:
if cache and (obj is not None):
# store cache
if cache_key is None:
cache_key = self._get_cache_key(func_name, params)
self._set_cached(cache_key, obj)
def service_available(self, cache = True):
"""
Return whether the Web Service is correctly able to answer our requests.
@raise WebService.UnsupportedParameters: if input parameters are invalid
@raise WebService.RequestError: if request cannot be satisfied
@raise WebService.MethodNotAvailable: if API method is not available
remotely and an error occured (error code passed as exception
argument)
@raise WebService.AuthenticationRequired: if require_credentials is True
and credentials are required.
@raise WebService.AuthenticationFailed: if credentials are not valid
@raise WebService.MalformedResponse: if JSON response cannot be
converted back to dict.
@raise WebService.UnsupportedAPILevel: if client API and Web Service
API do not match
@raise WebService.MethodResponseError; if method execution failed
"""
params = locals().copy()
params.pop("self")
params.pop("cache")
return self._method_getter("service_available", params, cache = cache,
require_credentials = False)
def data_send_available(self):
"""
Return whether data send is correctly working. A temporary file with
random content is sent to the service, that would need to calculate
its md5 hash. For security reason, data will be accepted remotely if,
and only if its size is < 256 bytes.
"""
md5 = hashlib.md5()
test_str = const_convert_to_rawstring("")
for x in range(256):
test_str += chr(x)
md5.update(test_str)
expected_hash = md5.hexdigest()
func_name = "data_send_available"
tmp_fd, tmp_path = tempfile.mkstemp()
try:
with os.fdopen(tmp_fd, "ab+") as tmp_f:
tmp_f.write(test_str)
tmp_f.seek(0)
params = {
"test_param": "hello",
}
file_params = {
"test_file": ("test_file.txt", tmp_f),
}
remote_hash = self._method_getter(func_name, params,
cache = False, require_credentials = False,
file_params = file_params)
finally:
os.remove(tmp_path)
const_debug_write(__name__,
"WebService.%s, expected: %s, got: %s" % (
func_name, repr(expected_hash), repr(remote_hash),))
return expected_hash == remote_hash
class AuthenticationStorage(Singleton):
"""
Entropy Web Service authentication credentials storage class.
"""
_AUTH_FILE = ".entropy/id_entropy"
def init_singleton(self):
self.__dump_lock = threading.Lock()
# not loaded, load at very last moment
self.__store = None
# setup auth file path
home = os.getenv("HOME")
self.__auth_file = None
if home is not None:
if os.path.isdir(home) and os.access(home, os.W_OK):
auth_file = os.path.join(home,
AuthenticationStorage._AUTH_FILE)
auth_dir = os.path.dirname(auth_file)
if not os.path.isdir(auth_dir):
self.__auth_file = auth_file
try:
os.makedirs(auth_dir, 0o700)
const_setup_file(auth_dir, etpConst['entropygid'],
0o700)
except (OSError, IOError):
# ouch, no permissions
self.__auth_file = None
if self.__auth_file is None:
# cannot reliably store an auth file, falling back
# to a private temp file
tmp_fd, tmp_path = tempfile.mkstemp()
os.close(tmp_fd)
self.__auth_file = tmp_path
@property
def _authstore(self):
"""
Authentication data object automatically loaded from disk if needed.
"""
if self.__store is None:
store = entropy.dump.loadobj(self.__auth_file,
complete_path = True)
if store is None:
store = {}
elif not isinstance(store, dict):
store = {}
self.__store = store
return self.__store
def save(self):
"""
Save currently loaded authentication configuration to disk.
@return: True, if save was effectively run
@rtype: bool
"""
with self.__dump_lock:
entropy.dump.dumpobj(self.__auth_file, self._authstore,
complete_path = True, custom_permissions = 0o600)
# make sure
try:
const_setup_file(self.__auth_file, etpConst['entropygid'],
0o600)
return True
except (OSError, IOError):
return False
def add(self, repository_id, username, password):
"""
Add authentication credentials to Authentication configuration.
@param repository_id: repository identifier
@type repository_id: string
@param username: the username
@type username: string
@param password: the password
@type password: string
"""
self._authstore[repository_id] = {
'username': username,
'password': password,
}
def remove(self, repository_id):
"""
Remove any credential for given repository identifier.
@param repository_id: repository identifier
@type repository_id: string
@return: True, if removal went fine (if there was something to remove)
@rtype: bool
"""
try:
del self._authstore[repository_id]
return True
except KeyError:
return False
def get(self, repository_id):
"""
Get authentication credentials for given repository identifier.
@return: tuple composed by username, password, or None, if credentials
are not found
@rtype: tuple or None
"""
data = self._authstore.get(repository_id)
if data is None:
return None
return data['username'], data['password']

View File

@@ -0,0 +1,617 @@
# -*- coding: utf-8 -*-
import sys
import os
import tempfile
import unittest
sys.path.insert(0, '../')
sys.path.insert(0, '../../')
from entropy.client.interfaces import Client
from entropy.client.services.interfaces import Document, DocumentFactory
from entropy.const import etpConst, etpUi, const_convert_to_rawstring, \
const_convert_to_unicode
import entropy.tools
import tests._misc as _misc
class EntropyRepositoryTest(unittest.TestCase):
def setUp(self):
sys.stdout.write("%s called\n" % (self,))
sys.stdout.flush()
self._entropy = Client(installed_repo = -1, indexing = False,
xcache = False, repo_validation = False)
self._factory = self._entropy.WebServices()
self._repository_id = etpConst['officialrepositoryid']
self._fake_user = "entropy_unittest"
self._fake_pass = "entropy_unittest"
self._fake_package_name = "app-something/entropy-unittest"
self._fake_package_name_utf8 = const_convert_to_unicode(
"app-something/entropy-unìttest")
self._real_package_name = "media-sound/amarok"
def tearDown(self):
"""
tearDown is run after each test
"""
sys.stdout.write("%s ran\n" % (self,))
sys.stdout.flush()
# calling destroy() and shutdown()
# need to call destroy() directly to remove all the SystemSettings
# plugins because shutdown() doesn't, since it's meant to be called
# right before terminating the process
self._entropy.destroy()
self._entropy.shutdown()
def test_credentials(self):
webserv = self._factory.new(self._repository_id)
webserv.add_credentials("lxnay", "test")
self.assertEqual(webserv.get_credentials(), "lxnay")
self.assertEqual(webserv.credentials_available(), True)
self.assert_(webserv.remove_credentials())
self.assertEqual(webserv.credentials_available(), False)
def test_credentials_utf8(self):
user = const_convert_to_unicode("lxnày")
password = const_convert_to_unicode("pààààss")
webserv = self._factory.new(self._repository_id)
webserv.add_credentials(user, password)
self.assertEqual(webserv.get_credentials(), user)
self.assertEqual(webserv.credentials_available(), True)
self.assert_(webserv.remove_credentials())
self.assertEqual(webserv.credentials_available(), False)
def test_query_utf8(self):
webserv = self._factory.new(self._repository_id)
# this must not crash
vote_data = webserv.get_votes([self._fake_package_name_utf8])
def test_add_vote(self):
webserv = self._factory.new(self._repository_id)
# try with success
webserv.remove_credentials()
try:
webserv.add_vote(self._fake_package_name, 4.0)
# webserv.AuthenticationRequired should be raised
self.assert_(False)
except webserv.AuthenticationRequired:
webserv.add_credentials(self._fake_user, self._fake_pass)
self.assert_(webserv.credentials_available())
self.assertEqual(webserv.get_credentials(), self._fake_user)
# credentials must be valid
webserv.validate_credentials()
# now it should not crash
webserv.add_vote(self._fake_package_name, 4.0)
finally:
webserv.remove_credentials()
# now check back if average vote is still 4.0
vote = webserv.get_votes(
[self._fake_package_name])[self._fake_package_name]
self.assertEqual(vote, 4.0)
def test_add_downloads(self):
webserv = self._factory.new(self._repository_id)
pk = self._fake_package_name
pkg_list = [pk]
cur_downloads = webserv.get_downloads(pkg_list)[pk]
if cur_downloads is None:
cur_downloads = 0
self.assert_(webserv.add_downloads([self._fake_package_name]))
# expect (cur_downloads + 1) now
new_downloads = webserv.get_downloads(pkg_list)[pk]
self.assertEqual(cur_downloads + 1, new_downloads)
def test_add_icon(self):
webserv = self._factory.new(self._repository_id)
doc_factory = webserv.document_factory()
keywords = ["keyword1", "keyword2"]
description = const_convert_to_unicode("descrìption")
title = const_convert_to_unicode("tìtle")
tmp_fd, tmp_path = tempfile.mkstemp()
with open(tmp_path, "ab+") as tmp_f:
tmp_f.write(const_convert_to_rawstring('\x89PNG\x00\x00'))
tmp_f.flush()
tmp_f.seek(0)
doc = doc_factory.icon(self._fake_user, tmp_f, title,
description, keywords)
webserv.remove_credentials()
try:
webserv.add_document(doc)
# webserv.AuthenticationRequired should be raised
self.assert_(False)
except webserv.AuthenticationRequired:
webserv.add_credentials(self._fake_user, self._fake_pass)
self.assert_(webserv.credentials_available())
self.assertEqual(webserv.get_credentials(), self._fake_user)
# credentials must be valid
webserv.validate_credentials()
# now it should not crash
new_doc = webserv.add_document(doc)
# got the new document back, which is the same plus document_id
finally:
webserv.remove_credentials()
# now check back if document is there
doc_id = new_doc[Document.DOCUMENT_DOCUMENT_ID]
remote_doc = webserv.get_documents_by_id([doc_id])[doc_id]
self.assert_(new_doc.is_icon())
self.assert_(remote_doc.is_icon())
self.assert_(not remote_doc.is_comment())
self.assert_(not remote_doc.is_image())
self.assert_(not remote_doc.is_video())
self.assert_(not remote_doc.is_file())
self.assertEqual(new_doc.repository_id(), self._repository_id)
self.assertEqual(new_doc.document_type(), remote_doc.document_type())
self.assertEqual(new_doc.document_id(), remote_doc.document_id())
self.assertEqual(new_doc.repository_id(), remote_doc.repository_id())
self.assertEqual(new_doc.document_keywords(), keywords)
self.assertEqual(new_doc[DocumentFactory.DOCUMENT_USERNAME_ID],
remote_doc[DocumentFactory.DOCUMENT_USERNAME_ID])
self.assertEqual(new_doc[Document.DOCUMENT_DOCUMENT_ID],
remote_doc[Document.DOCUMENT_DOCUMENT_ID])
self.assertEqual(new_doc[DocumentFactory.DOCUMENT_DESCRIPTION_ID],
remote_doc[DocumentFactory.DOCUMENT_DESCRIPTION_ID])
self.assertEqual(remote_doc[DocumentFactory.DOCUMENT_DESCRIPTION_ID],
description)
self.assertEqual(new_doc[DocumentFactory.DOCUMENT_TITLE_ID],
remote_doc[DocumentFactory.DOCUMENT_TITLE_ID])
self.assertEqual(remote_doc[DocumentFactory.DOCUMENT_TITLE_ID], title)
self.assertEqual(new_doc.document_timestamp(),
remote_doc.document_timestamp())
self.assertEqual(new_doc.document_keywords(),
remote_doc.document_keywords())
# now try to remove
self.assert_(
webserv.remove_document(remote_doc[Document.DOCUMENT_DOCUMENT_ID]))
def test_add_image(self):
webserv = self._factory.new(self._repository_id)
doc_factory = webserv.document_factory()
keywords = ["keyword1", "keyword2"]
description = const_convert_to_unicode("descrìption")
title = const_convert_to_unicode("tìtle")
tmp_fd, tmp_path = tempfile.mkstemp()
with open(tmp_path, "ab+") as tmp_f:
tmp_f.write(const_convert_to_rawstring('\x89PNG\x00\x00'))
tmp_f.flush()
tmp_f.seek(0)
doc = doc_factory.image(self._fake_user, tmp_f, title,
description, keywords)
webserv.remove_credentials()
try:
webserv.add_document(doc)
# webserv.AuthenticationRequired should be raised
self.assert_(False)
except webserv.AuthenticationRequired:
webserv.add_credentials(self._fake_user, self._fake_pass)
self.assert_(webserv.credentials_available())
self.assertEqual(webserv.get_credentials(), self._fake_user)
# credentials must be valid
webserv.validate_credentials()
# now it should not crash
new_doc = webserv.add_document(doc)
# got the new document back, which is the same plus document_id
finally:
webserv.remove_credentials()
# now check back if document is there
doc_id = new_doc[Document.DOCUMENT_DOCUMENT_ID]
remote_doc = webserv.get_documents_by_id([doc_id])[doc_id]
self.assert_(new_doc.is_image())
self.assert_(remote_doc.is_image())
self.assert_(not remote_doc.is_comment())
self.assert_(not remote_doc.is_icon())
self.assert_(not remote_doc.is_video())
self.assert_(not remote_doc.is_file())
self.assertEqual(new_doc.document_type(), remote_doc.document_type())
self.assertEqual(new_doc.document_id(), remote_doc.document_id())
self.assertEqual(new_doc.repository_id(), self._repository_id)
self.assertEqual(new_doc.repository_id(), remote_doc.repository_id())
self.assertEqual(new_doc.document_keywords(), keywords)
self.assertEqual(new_doc[DocumentFactory.DOCUMENT_USERNAME_ID],
remote_doc[DocumentFactory.DOCUMENT_USERNAME_ID])
self.assertEqual(new_doc[Document.DOCUMENT_DOCUMENT_ID],
remote_doc[Document.DOCUMENT_DOCUMENT_ID])
self.assertEqual(new_doc[DocumentFactory.DOCUMENT_DESCRIPTION_ID],
remote_doc[DocumentFactory.DOCUMENT_DESCRIPTION_ID])
self.assertEqual(remote_doc[DocumentFactory.DOCUMENT_DESCRIPTION_ID],
description)
self.assertEqual(new_doc[DocumentFactory.DOCUMENT_TITLE_ID],
remote_doc[DocumentFactory.DOCUMENT_TITLE_ID])
self.assertEqual(remote_doc[DocumentFactory.DOCUMENT_TITLE_ID], title)
self.assertEqual(new_doc.document_timestamp(),
remote_doc.document_timestamp())
self.assertEqual(new_doc.document_keywords(),
remote_doc.document_keywords())
# now try to remove
self.assert_(
webserv.remove_document(remote_doc[Document.DOCUMENT_DOCUMENT_ID]))
def test_add_comment(self):
webserv = self._factory.new(self._repository_id)
doc_factory = webserv.document_factory()
keywords = ["keyword1", "keyword2"]
comment = const_convert_to_unicode("comment hellò")
title = const_convert_to_unicode("tìtle")
doc = doc_factory.comment(self._fake_user, comment, title, keywords)
webserv.remove_credentials()
try:
webserv.add_document(doc)
# webserv.AuthenticationRequired should be raised
self.assert_(False)
except webserv.AuthenticationRequired:
webserv.add_credentials(self._fake_user, self._fake_pass)
self.assert_(webserv.credentials_available())
self.assertEqual(webserv.get_credentials(), self._fake_user)
# credentials must be valid
webserv.validate_credentials()
# now it should not crash
new_doc = webserv.add_document(doc)
# got the new document back, which is the same plus document_id
finally:
webserv.remove_credentials()
# now check back if document is there
doc_id = new_doc[Document.DOCUMENT_DOCUMENT_ID]
remote_doc = webserv.get_documents_by_id([doc_id])[doc_id]
self.assert_(new_doc.is_comment())
self.assert_(remote_doc.is_comment())
self.assert_(not remote_doc.is_image())
self.assert_(not remote_doc.is_icon())
self.assert_(not remote_doc.is_video())
self.assert_(not remote_doc.is_file())
self.assertEqual(new_doc.repository_id(), self._repository_id)
self.assertEqual(new_doc.document_type(), remote_doc.document_type())
self.assertEqual(new_doc.document_id(), remote_doc.document_id())
self.assertEqual(new_doc.repository_id(), remote_doc.repository_id())
self.assertEqual(new_doc.document_keywords(), keywords)
self.assertEqual(new_doc[DocumentFactory.DOCUMENT_USERNAME_ID],
remote_doc[DocumentFactory.DOCUMENT_USERNAME_ID])
self.assertEqual(new_doc[Document.DOCUMENT_DOCUMENT_ID],
remote_doc[Document.DOCUMENT_DOCUMENT_ID])
self.assertEqual(remote_doc[Document.DOCUMENT_DATA_ID], comment)
self.assertEqual(new_doc[DocumentFactory.DOCUMENT_TITLE_ID],
remote_doc[DocumentFactory.DOCUMENT_TITLE_ID])
self.assertEqual(remote_doc[DocumentFactory.DOCUMENT_TITLE_ID], title)
self.assertEqual(new_doc.document_timestamp(),
remote_doc.document_timestamp())
self.assertEqual(new_doc.document_keywords(),
remote_doc.document_keywords())
# now try to remove
self.assert_(
webserv.remove_document(remote_doc[Document.DOCUMENT_DOCUMENT_ID]))
def test_add_file(self):
webserv = self._factory.new(self._repository_id)
doc_factory = webserv.document_factory()
keywords = ["keyword1", "keyword2"]
description = const_convert_to_unicode("descrìption")
title = const_convert_to_unicode("tìtle")
tmp_fd, tmp_path = tempfile.mkstemp()
with open(tmp_path, "ab+") as tmp_f:
tmp_f.write(const_convert_to_rawstring('BZ2\x00\x00'))
tmp_f.flush()
tmp_f.seek(0)
doc = doc_factory.file(self._fake_user, tmp_f, title,
description, keywords)
webserv.remove_credentials()
try:
webserv.add_document(doc)
# webserv.AuthenticationRequired should be raised
self.assert_(False)
except webserv.AuthenticationRequired:
webserv.add_credentials(self._fake_user, self._fake_pass)
self.assert_(webserv.credentials_available())
self.assertEqual(webserv.get_credentials(), self._fake_user)
# credentials must be valid
webserv.validate_credentials()
# now it should not crash
new_doc = webserv.add_document(doc)
# got the new document back, which is the same plus document_id
finally:
webserv.remove_credentials()
# now check back if document is there
doc_id = new_doc[Document.DOCUMENT_DOCUMENT_ID]
remote_doc = webserv.get_documents_by_id([doc_id])[doc_id]
self.assert_(new_doc.is_file())
self.assert_(remote_doc.is_file())
self.assert_(not remote_doc.is_comment())
self.assert_(not remote_doc.is_icon())
self.assert_(not remote_doc.is_video())
self.assert_(not remote_doc.is_image())
self.assertEqual(new_doc.document_type(), remote_doc.document_type())
self.assertEqual(new_doc.document_id(), remote_doc.document_id())
self.assertEqual(new_doc.repository_id(), self._repository_id)
self.assertEqual(new_doc.repository_id(), remote_doc.repository_id())
self.assertEqual(new_doc.document_keywords(), keywords)
self.assertEqual(new_doc[DocumentFactory.DOCUMENT_USERNAME_ID],
remote_doc[DocumentFactory.DOCUMENT_USERNAME_ID])
self.assertEqual(new_doc[Document.DOCUMENT_DOCUMENT_ID],
remote_doc[Document.DOCUMENT_DOCUMENT_ID])
self.assertEqual(new_doc[DocumentFactory.DOCUMENT_DESCRIPTION_ID],
remote_doc[DocumentFactory.DOCUMENT_DESCRIPTION_ID])
self.assertEqual(remote_doc[DocumentFactory.DOCUMENT_DESCRIPTION_ID],
description)
self.assertEqual(new_doc[DocumentFactory.DOCUMENT_TITLE_ID],
remote_doc[DocumentFactory.DOCUMENT_TITLE_ID])
self.assertEqual(remote_doc[DocumentFactory.DOCUMENT_TITLE_ID], title)
self.assertEqual(new_doc.document_timestamp(),
remote_doc.document_timestamp())
self.assertEqual(new_doc.document_keywords(),
remote_doc.document_keywords())
# now try to remove
self.assert_(
webserv.remove_document(remote_doc[Document.DOCUMENT_DOCUMENT_ID]))
def test_add_video(self):
webserv = self._factory.new(self._repository_id)
doc_factory = webserv.document_factory()
keywords = ["keyword1", "keyword2"]
description = const_convert_to_unicode("descrìption")
title = const_convert_to_unicode("tìtle")
tmp_fd, tmp_path = tempfile.mkstemp()
with open(tmp_path, "ab+") as tmp_f:
tmp_f.write(const_convert_to_rawstring('MPEG4\x00\x00'))
tmp_f.flush()
tmp_f.seek(0)
doc = doc_factory.video(self._fake_user, tmp_f, title,
description, keywords)
# do not actually publish the video
doc['pretend'] = 1
webserv.remove_credentials()
try:
webserv.add_document(doc)
# webserv.AuthenticationRequired should be raised
self.assert_(False)
except webserv.AuthenticationRequired:
webserv.add_credentials(self._fake_user, self._fake_pass)
self.assert_(webserv.credentials_available())
self.assertEqual(webserv.get_credentials(), self._fake_user)
# credentials must be valid
webserv.validate_credentials()
# now it should not crash
new_doc = webserv.add_document(doc)
# got the new document back, which is the same plus document_id
finally:
webserv.remove_credentials()
# now check back if document is there
doc_id = new_doc[Document.DOCUMENT_DOCUMENT_ID]
remote_doc = webserv.get_documents_by_id([doc_id])[doc_id]
self.assert_(new_doc.is_video())
self.assert_(remote_doc.is_video())
self.assert_(not remote_doc.is_comment())
self.assert_(not remote_doc.is_icon())
self.assert_(not remote_doc.is_file())
self.assert_(not remote_doc.is_image())
self.assertEqual(new_doc.document_type(), remote_doc.document_type())
self.assertEqual(new_doc.document_id(), remote_doc.document_id())
self.assertEqual(new_doc.repository_id(), self._repository_id)
self.assertEqual(new_doc.repository_id(), remote_doc.repository_id())
self.assertEqual(new_doc.document_keywords(), keywords)
self.assertEqual(new_doc[DocumentFactory.DOCUMENT_USERNAME_ID],
remote_doc[DocumentFactory.DOCUMENT_USERNAME_ID])
self.assertEqual(new_doc[Document.DOCUMENT_DOCUMENT_ID],
remote_doc[Document.DOCUMENT_DOCUMENT_ID])
self.assertEqual(new_doc[DocumentFactory.DOCUMENT_DESCRIPTION_ID],
remote_doc[DocumentFactory.DOCUMENT_DESCRIPTION_ID])
self.assertEqual(remote_doc[DocumentFactory.DOCUMENT_DESCRIPTION_ID],
description)
self.assertEqual(new_doc[DocumentFactory.DOCUMENT_TITLE_ID],
remote_doc[DocumentFactory.DOCUMENT_TITLE_ID])
self.assertEqual(remote_doc[DocumentFactory.DOCUMENT_TITLE_ID], title)
self.assertEqual(new_doc.document_timestamp(),
remote_doc.document_timestamp())
self.assertEqual(new_doc.document_keywords(),
remote_doc.document_keywords())
# now try to remove
self.assert_(
webserv.remove_document(remote_doc[Document.DOCUMENT_DOCUMENT_ID]))
def test_get_documents(self):
pk = self._real_package_name
webserv = self._factory.new(self._repository_id)
docs = webserv.get_documents([pk], cache = False)
self.assert_(pk in docs)
self.assert_(docs[pk])
for vals in docs.values():
for val in vals:
self.assert_(isinstance(val, Document))
def test_get_comments(self):
pk = self._real_package_name
webserv = self._factory.new(self._repository_id)
docs = webserv.get_comments([pk], cache = False)
self.assert_(pk in docs)
self.assert_(docs[pk])
for vals in docs.values():
for val in vals:
self.assert_(isinstance(val, Document))
def test_report_error(self):
params = {}
params['arch'] = etpConst['currentarch']
params['stacktrace'] = "zomg Vogons!"
params['name'] = "Ford Prefect"
params['email'] = "ford@betelgeuse.gal"
params['version'] = etpConst['entropyversion']
params['errordata'] = "towel forgotten"
params['description'] = "don't panic"
params['arguments'] = ' '.join(sys.argv)
params['uid'] = etpConst['uid']
params['system_version'] = "N/A"
params['system_version'] = "42"
params['processes'] = "none"
params['lsof'] = "none"
params['lspci'] = "none"
params['dmesg'] = "none"
params['locale'] = "Vogonish"
params['repositories.conf'] = "empty"
params['client.conf'] = "---NA---"
webserv = self._factory.new(self._repository_id)
outcome = webserv.report_error(params)
self.assertEqual(outcome, None)
def test_data_send_available(self):
webserv = self._factory.new(self._repository_id)
self.assertEqual(webserv.data_send_available(), True)
def test_factory_comment(self):
webserv = self._factory.new(self._repository_id)
doc_factory = webserv.document_factory()
doc = doc_factory.comment("username", "comment", "title", ["a", "b"])
self.assert_(doc.is_comment())
self.assert_(not doc.is_image())
self.assert_(not doc.is_icon())
self.assert_(not doc.is_file())
self.assert_(not doc.is_video())
self.assertEqual(doc.document_id(), None) # it's new!
self.assertEqual(doc.repository_id(), self._repository_id)
self.assertEqual(doc.document_type(), Document.COMMENT_TYPE_ID)
self.assertEqual(doc.document_keywords(), ["a", "b"])
self.assertEqual(doc[DocumentFactory.DOCUMENT_USERNAME_ID], "username")
self.assertEqual(doc[Document.DOCUMENT_DATA_ID], "comment")
self.assertEqual(doc[DocumentFactory.DOCUMENT_TITLE_ID], "title")
def test_factory_image(self):
webserv = self._factory.new(self._repository_id)
doc_factory = webserv.document_factory()
tmp_fd, tmp_path = tempfile.mkstemp()
try:
with open(tmp_path, "ab+") as tmp_f:
doc = doc_factory.image("username", tmp_f, "title",
"description", ["a", "b"])
self.assert_(not doc.is_comment())
self.assert_(doc.is_image())
self.assert_(not doc.is_icon())
self.assert_(not doc.is_file())
self.assert_(not doc.is_video())
self.assertEqual(doc.document_id(), None) # it's new!
self.assertEqual(doc.repository_id(), self._repository_id)
self.assertEqual(doc.document_type(), Document.IMAGE_TYPE_ID)
self.assertEqual(doc.document_keywords(), ["a", "b"])
self.assertEqual(doc[DocumentFactory.DOCUMENT_USERNAME_ID],
"username")
self.assertEqual(doc[DocumentFactory.DOCUMENT_PAYLOAD_ID],
(os.path.basename(tmp_f.name), tmp_f))
self.assertEqual(doc[DocumentFactory.DOCUMENT_TITLE_ID], "title")
self.assertEqual(doc[DocumentFactory.DOCUMENT_DESCRIPTION_ID],
"description")
finally:
os.remove(tmp_path)
def test_factory_icon(self):
webserv = self._factory.new(self._repository_id)
doc_factory = webserv.document_factory()
tmp_fd, tmp_path = tempfile.mkstemp()
try:
with open(tmp_path, "ab+") as tmp_f:
doc = doc_factory.icon("username", tmp_f, "title",
"description", ["a", "b"])
self.assert_(not doc.is_comment())
self.assert_(not doc.is_image())
self.assert_(doc.is_icon())
self.assert_(not doc.is_file())
self.assert_(not doc.is_video())
self.assertEqual(doc.document_id(), None) # it's new!
self.assertEqual(doc.repository_id(), self._repository_id)
self.assertEqual(doc.document_type(), Document.ICON_TYPE_ID)
self.assertEqual(doc.document_keywords(), ["a", "b"])
self.assertEqual(doc[DocumentFactory.DOCUMENT_USERNAME_ID],
"username")
self.assertEqual(doc[DocumentFactory.DOCUMENT_PAYLOAD_ID],
(os.path.basename(tmp_f.name), tmp_f))
self.assertEqual(doc[DocumentFactory.DOCUMENT_TITLE_ID],
"title")
self.assertEqual(doc[DocumentFactory.DOCUMENT_DESCRIPTION_ID],
"description")
finally:
os.remove(tmp_path)
def test_factory_file(self):
webserv = self._factory.new(self._repository_id)
doc_factory = webserv.document_factory()
tmp_fd, tmp_path = tempfile.mkstemp()
try:
with open(tmp_path, "ab+") as tmp_f:
doc = doc_factory.file("username", tmp_f, "title",
"description", ["a", "b"])
self.assert_(not doc.is_comment())
self.assert_(not doc.is_image())
self.assert_(not doc.is_icon())
self.assert_(doc.is_file())
self.assert_(not doc.is_video())
self.assertEqual(doc.document_id(), None) # it's new!
self.assertEqual(doc.repository_id(), self._repository_id)
self.assertEqual(doc.document_type(), Document.FILE_TYPE_ID)
self.assertEqual(doc[DocumentFactory.DOCUMENT_USERNAME_ID],
"username")
self.assertEqual(doc[DocumentFactory.DOCUMENT_PAYLOAD_ID],
(os.path.basename(tmp_f.name), tmp_f))
self.assertEqual(doc[DocumentFactory.DOCUMENT_TITLE_ID],
"title")
self.assertEqual(doc[DocumentFactory.DOCUMENT_DESCRIPTION_ID],
"description")
self.assertEqual(doc.document_keywords(), ["a", "b"])
finally:
os.remove(tmp_path)
def test_factory_video(self):
webserv = self._factory.new(self._repository_id)
doc_factory = webserv.document_factory()
tmp_fd, tmp_path = tempfile.mkstemp()
try:
with open(tmp_path, "ab+") as tmp_f:
doc = doc_factory.video("username", tmp_f, "title",
"description", ["a", "b"])
self.assert_(not doc.is_comment())
self.assert_(not doc.is_image())
self.assert_(not doc.is_icon())
self.assert_(not doc.is_file())
self.assert_(doc.is_video())
self.assertEqual(doc.document_id(), None) # it's new!
self.assertEqual(doc.repository_id(), self._repository_id)
self.assertEqual(doc.document_type(), Document.VIDEO_TYPE_ID)
self.assertEqual(doc[DocumentFactory.DOCUMENT_USERNAME_ID],
"username")
self.assertEqual(doc[DocumentFactory.DOCUMENT_PAYLOAD_ID],
(os.path.basename(tmp_f.name), tmp_f))
self.assertEqual(doc[DocumentFactory.DOCUMENT_TITLE_ID],
"title")
self.assertEqual(doc[DocumentFactory.DOCUMENT_DESCRIPTION_ID],
"description")
self.assertEqual(doc.document_keywords(), ["a", "b"])
finally:
os.remove(tmp_path)
if __name__ == '__main__':
if "--debug" in sys.argv:
sys.argv.remove("--debug")
from entropy.const import etpUi
etpUi['debug'] = True
unittest.main()
entropy.tools.kill_threads()
raise SystemExit(0)