From a90e450433864e2ab044aba992c90b8a5da6f9ce Mon Sep 17 00:00:00 2001 From: Fabio Erculiani Date: Tue, 1 Nov 2011 13:26:52 +0100 Subject: [PATCH] [entropy.transceivers] add lock() method, bumpi API rev --- .../plugins/interfaces/file_plugin.py | 45 +++++++++++++++- .../plugins/interfaces/ftp_plugin.py | 54 ++++++++++++++++++- .../plugins/interfaces/ssh_plugin.py | 37 ++++++++++++- lib/entropy/transceivers/uri_handlers/skel.py | 20 ++++++- 4 files changed, 149 insertions(+), 7 deletions(-) diff --git a/lib/entropy/transceivers/uri_handlers/plugins/interfaces/file_plugin.py b/lib/entropy/transceivers/uri_handlers/plugins/interfaces/file_plugin.py index 187ae43e0..159b2e04e 100644 --- a/lib/entropy/transceivers/uri_handlers/plugins/interfaces/file_plugin.py +++ b/lib/entropy/transceivers/uri_handlers/plugins/interfaces/file_plugin.py @@ -14,8 +14,10 @@ import os import pwd import grp import shutil +import errno +import fcntl -from entropy.const import const_setup_perms, etpConst +from entropy.const import const_setup_perms, etpConst, const_debug_write from entropy.transceivers.uri_handlers.skel import EntropyUriHandler from entropy.tools import md5sum @@ -91,7 +93,7 @@ class EntropyFileUriHandler(EntropyUriHandler): EntropyUriHandler based FILE (local) transceiver plugin. """ - PLUGIN_API_VERSION = 3 + PLUGIN_API_VERSION = 4 @staticmethod def approve_uri(uri): @@ -150,6 +152,45 @@ class EntropyFileUriHandler(EntropyUriHandler): os.rename(tmp_remote_str, remote_str) return True + def lock(self, remote_path): + remote_str = self._setup_remote_path(remote_path) + remote_str_lock = os.path.join( + os.path.dirname(remote_str), + "." + os.path.basename(remote_str) + ".lock") + const_debug_write(__name__, + "lock(): remote_str: %s, lock: %s" % ( + remote_str, remote_str_lock,)) + + # Use low level IO because Python open() and with stmt + # do unwanted things like creating the file on close() or + # context exit. I didn't investigate nor I do care actually. + lock_fd = None + try: + lock_fd = os.open(remote_str_lock, os.O_RDWR | os.O_CREAT) + try: + fcntl.flock(lock_fd, fcntl.LOCK_EX | fcntl.LOCK_NB) + except IOError as err: + if err.errno not in (errno.EACCES, errno.EAGAIN,): + # ouch, wtf? + raise + return False + # create file + if os.path.isfile(remote_str): + # locked, ouch + return False + # we run in mutual exclusion, so it's safe + # to do test-and-set here + with open(remote_str, "wb") as remote_f: + pass + # cleanup the lock file + os.remove(remote_str_lock) + # release the resource + fcntl.flock(lock_fd, fcntl.LOCK_UN) + return True + finally: + if lock_fd is not None: + os.close(lock_fd) + def upload_many(self, load_path_list, remote_dir): for load_path in load_path_list: remote_path = os.path.join(remote_dir, os.path.basename(load_path)) diff --git a/lib/entropy/transceivers/uri_handlers/plugins/interfaces/ftp_plugin.py b/lib/entropy/transceivers/uri_handlers/plugins/interfaces/ftp_plugin.py index ad697f8a2..d2eef1d3f 100644 --- a/lib/entropy/transceivers/uri_handlers/plugins/interfaces/ftp_plugin.py +++ b/lib/entropy/transceivers/uri_handlers/plugins/interfaces/ftp_plugin.py @@ -12,7 +12,9 @@ import os import time import socket +import tempfile +from entropy.const import const_debug_write from entropy.tools import print_traceback, get_file_size, \ convert_seconds_to_fancy_output, bytes_into_human, spliturl from entropy.output import blue, brown, darkgreen, red @@ -27,7 +29,7 @@ class EntropyFtpUriHandler(EntropyUriHandler): EntropyUriHandler based FTP transceiver plugin. """ - PLUGIN_API_VERSION = 3 + PLUGIN_API_VERSION = 4 _DEFAULT_TIMEOUT = 60 @@ -331,6 +333,9 @@ class EntropyFtpUriHandler(EntropyUriHandler): def _mkdir(self, directory): return self.__ftpconn.mkd(directory) + def _rmdir(self, directory): + return self.__ftpconn.rmd(directory) + def download(self, remote_path, save_path): self.__connect_if_not() @@ -465,6 +470,53 @@ class EntropyFtpUriHandler(EntropyUriHandler): self.delete(tmp_path) self.delete(path) + def lock(self, remote_path): + # The only atomic operation on FTP seems to be mkdir() + # But there is no actual guarantee because it really depends + # on the server implementation. + # FTP is very old, got to live with it. + self.__connect_if_not() + + remote_path_lock = os.path.join( + os.path.dirname(remote_path), + "." + os.path.basename(remote_path) + ".lock") + remote_ptr = os.path.join(self.__ftpdir, remote_path) + remote_ptr_lock = os.path.join(self.__ftpdir, remote_path_lock) + + const_debug_write(__name__, + "lock(): remote_ptr: %s, lock: %s" % ( + remote_ptr, remote_ptr_lock,)) + + try: + self._mkdir(remote_ptr_lock) + except self.ftplib.error_perm as e: + return False + + # now we can create the lock file reliably + tmp_fd, tmp_path = None, None + try: + tmp_fd, tmp_path = tempfile.mkstemp(prefix="entropy.txc.ftp.lock") + # check if remote_ptr is already there + if self._is_path_available(remote_ptr): + return False + with open(tmp_path, "rb") as f: + rc = self.__ftpconn.storbinary( + "STOR " + remote_ptr, f) + done = rc.find("226") != -1 + if not done: + # wtf? + return False + return True + finally: + if tmp_fd is not None: + os.close(tmp_fd) + if tmp_path is not None: + os.remove(tmp_path) + # and always remove the directory created with _mkdir() + # we hope that, if we were able to create it, we're also + # able to remove it. + self._rmdir(remote_ptr_lock) + def upload_many(self, load_path_list, remote_dir): for load_path in load_path_list: remote_path = os.path.join(remote_dir, os.path.basename(load_path)) diff --git a/lib/entropy/transceivers/uri_handlers/plugins/interfaces/ssh_plugin.py b/lib/entropy/transceivers/uri_handlers/plugins/interfaces/ssh_plugin.py index bf7662550..5baea397e 100644 --- a/lib/entropy/transceivers/uri_handlers/plugins/interfaces/ssh_plugin.py +++ b/lib/entropy/transceivers/uri_handlers/plugins/interfaces/ssh_plugin.py @@ -9,6 +9,7 @@ B{EntropyTransceiver SSH URI Handler module}. """ +import re import os import errno import time @@ -16,7 +17,8 @@ import tempfile import shutil import codecs -from entropy.const import const_isnumber, const_debug_write +from entropy.const import const_isnumber, const_debug_write, \ + etpConst from entropy.output import brown, darkgreen, teal from entropy.i18n import _ from entropy.transceivers.exceptions import TransceiverConnectionError @@ -28,7 +30,7 @@ class EntropySshUriHandler(EntropyUriHandler): EntropyUriHandler based SSH (with pubkey) transceiver plugin. """ - PLUGIN_API_VERSION = 3 + PLUGIN_API_VERSION = 4 _DEFAULT_TIMEOUT = 60 _DEFAULT_PORT = 22 @@ -291,6 +293,37 @@ class EntropySshUriHandler(EntropyUriHandler): # atomic rename return self.rename(tmp_remote_path, remote_path) + valid_lock_path = re.compile("^([A-Za-z0-9/\.:\-_~]+)$") + def lock(self, remote_path): + + # we trust dir but not remote_path, because we do + # shell code below. + reg = EntropySshUriHandler.valid_lock_path + if not reg.match(remote_path): + raise ValueError("illegal lock path") + remote_ptr = os.path.join(self.__dir, remote_path) + + remote_ptr_lock = os.path.join( + self.__dir, os.path.dirname(remote_path), + "." + os.path.basename(remote_path)) + remote_ptr_lock += ".lock" + const_debug_write(__name__, + "lock(): remote_ptr: %s, lock: %s" % ( + remote_ptr, remote_ptr_lock,)) + + args, remote_str = self._setup_fs_args() + lock_cmd = '( flock -x -n 9; if [ "${?}" != "0" ]; ' + \ + 'then echo -n "FAIL"; else if [ -f ' + remote_ptr + ' ]; then ' + \ + 'echo -n "FAIL"; else touch ' + remote_ptr + ' && ' + \ + 'rm ' + remote_ptr_lock + ' && echo -n "OK"; fi; fi ) 9> ' \ + + remote_ptr_lock + args += [remote_str, lock_cmd] + exec_rc, output, error = self._exec_cmd(args) + const_debug_write(__name__, + "lock(), outcome: lock: %s, rc: %s, out: %s, err: %s" % ( + remote_ptr_lock, exec_rc, output, error,)) + return output == "OK" + def upload_many(self, load_path_list, remote_dir): def do_rm(path): diff --git a/lib/entropy/transceivers/uri_handlers/skel.py b/lib/entropy/transceivers/uri_handlers/skel.py index c5660c82c..3b909bd05 100644 --- a/lib/entropy/transceivers/uri_handlers/skel.py +++ b/lib/entropy/transceivers/uri_handlers/skel.py @@ -19,11 +19,11 @@ class EntropyUriHandler(TextInterface): a common API for implementing custom URI handlers. To add your URI handler to EntropyTransceiver, do the following: - >>> EntropyTransceiver.add_uri_handler(my_entropy_transceiver_based_instance) + >>> EntropyTransceiver.add_uri_handler(entropy_transceiver_based_instance) "add_uri_handler" is a EntropyTransceiver static method. """ - BASE_PLUGIN_API_VERSION = 3 + BASE_PLUGIN_API_VERSION = 4 TMP_TXC_FILE_EXT = ".tmp-entropy-txc" @@ -208,6 +208,22 @@ class EntropyUriHandler(TextInterface): """ raise NotImplementedError() + def lock(self, remote_path): + """ + Create remote "lock" file atomically. + To drop a lock, just call remove(). + The goal here is just being able to create a remote file + in mutual exclusion between other Entropy Server instances. + Please note: the locking mechanism is guaranteed to work + only when callers share the same transceiver plugin. + + @param remote_path: remote path to file lock + @type remote_path: string + @return: True, if lock has been created, False otherwise + @rtype: bool + """ + raise NotImplementedError() + def upload_many(self, load_path_list, remote_dir): """ Upload many files at once, taken from load_path_list, stored into