[entropy.transceivers] add lock() method, bumpi API rev
This commit is contained in:
@@ -14,8 +14,10 @@ import os
|
||||
import pwd
|
||||
import grp
|
||||
import shutil
|
||||
import errno
|
||||
import fcntl
|
||||
|
||||
from entropy.const import const_setup_perms, etpConst
|
||||
from entropy.const import const_setup_perms, etpConst, const_debug_write
|
||||
from entropy.transceivers.uri_handlers.skel import EntropyUriHandler
|
||||
from entropy.tools import md5sum
|
||||
|
||||
@@ -91,7 +93,7 @@ class EntropyFileUriHandler(EntropyUriHandler):
|
||||
EntropyUriHandler based FILE (local) transceiver plugin.
|
||||
"""
|
||||
|
||||
PLUGIN_API_VERSION = 3
|
||||
PLUGIN_API_VERSION = 4
|
||||
|
||||
@staticmethod
|
||||
def approve_uri(uri):
|
||||
@@ -150,6 +152,45 @@ class EntropyFileUriHandler(EntropyUriHandler):
|
||||
os.rename(tmp_remote_str, remote_str)
|
||||
return True
|
||||
|
||||
def lock(self, remote_path):
|
||||
remote_str = self._setup_remote_path(remote_path)
|
||||
remote_str_lock = os.path.join(
|
||||
os.path.dirname(remote_str),
|
||||
"." + os.path.basename(remote_str) + ".lock")
|
||||
const_debug_write(__name__,
|
||||
"lock(): remote_str: %s, lock: %s" % (
|
||||
remote_str, remote_str_lock,))
|
||||
|
||||
# Use low level IO because Python open() and with stmt
|
||||
# do unwanted things like creating the file on close() or
|
||||
# context exit. I didn't investigate nor I do care actually.
|
||||
lock_fd = None
|
||||
try:
|
||||
lock_fd = os.open(remote_str_lock, os.O_RDWR | os.O_CREAT)
|
||||
try:
|
||||
fcntl.flock(lock_fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
|
||||
except IOError as err:
|
||||
if err.errno not in (errno.EACCES, errno.EAGAIN,):
|
||||
# ouch, wtf?
|
||||
raise
|
||||
return False
|
||||
# create file
|
||||
if os.path.isfile(remote_str):
|
||||
# locked, ouch
|
||||
return False
|
||||
# we run in mutual exclusion, so it's safe
|
||||
# to do test-and-set here
|
||||
with open(remote_str, "wb") as remote_f:
|
||||
pass
|
||||
# cleanup the lock file
|
||||
os.remove(remote_str_lock)
|
||||
# release the resource
|
||||
fcntl.flock(lock_fd, fcntl.LOCK_UN)
|
||||
return True
|
||||
finally:
|
||||
if lock_fd is not None:
|
||||
os.close(lock_fd)
|
||||
|
||||
def upload_many(self, load_path_list, remote_dir):
|
||||
for load_path in load_path_list:
|
||||
remote_path = os.path.join(remote_dir, os.path.basename(load_path))
|
||||
|
||||
@@ -12,7 +12,9 @@
|
||||
import os
|
||||
import time
|
||||
import socket
|
||||
import tempfile
|
||||
|
||||
from entropy.const import const_debug_write
|
||||
from entropy.tools import print_traceback, get_file_size, \
|
||||
convert_seconds_to_fancy_output, bytes_into_human, spliturl
|
||||
from entropy.output import blue, brown, darkgreen, red
|
||||
@@ -27,7 +29,7 @@ class EntropyFtpUriHandler(EntropyUriHandler):
|
||||
EntropyUriHandler based FTP transceiver plugin.
|
||||
"""
|
||||
|
||||
PLUGIN_API_VERSION = 3
|
||||
PLUGIN_API_VERSION = 4
|
||||
|
||||
_DEFAULT_TIMEOUT = 60
|
||||
|
||||
@@ -331,6 +333,9 @@ class EntropyFtpUriHandler(EntropyUriHandler):
|
||||
def _mkdir(self, directory):
|
||||
return self.__ftpconn.mkd(directory)
|
||||
|
||||
def _rmdir(self, directory):
|
||||
return self.__ftpconn.rmd(directory)
|
||||
|
||||
def download(self, remote_path, save_path):
|
||||
|
||||
self.__connect_if_not()
|
||||
@@ -465,6 +470,53 @@ class EntropyFtpUriHandler(EntropyUriHandler):
|
||||
self.delete(tmp_path)
|
||||
self.delete(path)
|
||||
|
||||
def lock(self, remote_path):
|
||||
# The only atomic operation on FTP seems to be mkdir()
|
||||
# But there is no actual guarantee because it really depends
|
||||
# on the server implementation.
|
||||
# FTP is very old, got to live with it.
|
||||
self.__connect_if_not()
|
||||
|
||||
remote_path_lock = os.path.join(
|
||||
os.path.dirname(remote_path),
|
||||
"." + os.path.basename(remote_path) + ".lock")
|
||||
remote_ptr = os.path.join(self.__ftpdir, remote_path)
|
||||
remote_ptr_lock = os.path.join(self.__ftpdir, remote_path_lock)
|
||||
|
||||
const_debug_write(__name__,
|
||||
"lock(): remote_ptr: %s, lock: %s" % (
|
||||
remote_ptr, remote_ptr_lock,))
|
||||
|
||||
try:
|
||||
self._mkdir(remote_ptr_lock)
|
||||
except self.ftplib.error_perm as e:
|
||||
return False
|
||||
|
||||
# now we can create the lock file reliably
|
||||
tmp_fd, tmp_path = None, None
|
||||
try:
|
||||
tmp_fd, tmp_path = tempfile.mkstemp(prefix="entropy.txc.ftp.lock")
|
||||
# check if remote_ptr is already there
|
||||
if self._is_path_available(remote_ptr):
|
||||
return False
|
||||
with open(tmp_path, "rb") as f:
|
||||
rc = self.__ftpconn.storbinary(
|
||||
"STOR " + remote_ptr, f)
|
||||
done = rc.find("226") != -1
|
||||
if not done:
|
||||
# wtf?
|
||||
return False
|
||||
return True
|
||||
finally:
|
||||
if tmp_fd is not None:
|
||||
os.close(tmp_fd)
|
||||
if tmp_path is not None:
|
||||
os.remove(tmp_path)
|
||||
# and always remove the directory created with _mkdir()
|
||||
# we hope that, if we were able to create it, we're also
|
||||
# able to remove it.
|
||||
self._rmdir(remote_ptr_lock)
|
||||
|
||||
def upload_many(self, load_path_list, remote_dir):
|
||||
for load_path in load_path_list:
|
||||
remote_path = os.path.join(remote_dir, os.path.basename(load_path))
|
||||
|
||||
@@ -9,6 +9,7 @@
|
||||
B{EntropyTransceiver SSH URI Handler module}.
|
||||
|
||||
"""
|
||||
import re
|
||||
import os
|
||||
import errno
|
||||
import time
|
||||
@@ -16,7 +17,8 @@ import tempfile
|
||||
import shutil
|
||||
import codecs
|
||||
|
||||
from entropy.const import const_isnumber, const_debug_write
|
||||
from entropy.const import const_isnumber, const_debug_write, \
|
||||
etpConst
|
||||
from entropy.output import brown, darkgreen, teal
|
||||
from entropy.i18n import _
|
||||
from entropy.transceivers.exceptions import TransceiverConnectionError
|
||||
@@ -28,7 +30,7 @@ class EntropySshUriHandler(EntropyUriHandler):
|
||||
EntropyUriHandler based SSH (with pubkey) transceiver plugin.
|
||||
"""
|
||||
|
||||
PLUGIN_API_VERSION = 3
|
||||
PLUGIN_API_VERSION = 4
|
||||
|
||||
_DEFAULT_TIMEOUT = 60
|
||||
_DEFAULT_PORT = 22
|
||||
@@ -291,6 +293,37 @@ class EntropySshUriHandler(EntropyUriHandler):
|
||||
# atomic rename
|
||||
return self.rename(tmp_remote_path, remote_path)
|
||||
|
||||
valid_lock_path = re.compile("^([A-Za-z0-9/\.:\-_~]+)$")
|
||||
def lock(self, remote_path):
|
||||
|
||||
# we trust dir but not remote_path, because we do
|
||||
# shell code below.
|
||||
reg = EntropySshUriHandler.valid_lock_path
|
||||
if not reg.match(remote_path):
|
||||
raise ValueError("illegal lock path")
|
||||
remote_ptr = os.path.join(self.__dir, remote_path)
|
||||
|
||||
remote_ptr_lock = os.path.join(
|
||||
self.__dir, os.path.dirname(remote_path),
|
||||
"." + os.path.basename(remote_path))
|
||||
remote_ptr_lock += ".lock"
|
||||
const_debug_write(__name__,
|
||||
"lock(): remote_ptr: %s, lock: %s" % (
|
||||
remote_ptr, remote_ptr_lock,))
|
||||
|
||||
args, remote_str = self._setup_fs_args()
|
||||
lock_cmd = '( flock -x -n 9; if [ "${?}" != "0" ]; ' + \
|
||||
'then echo -n "FAIL"; else if [ -f ' + remote_ptr + ' ]; then ' + \
|
||||
'echo -n "FAIL"; else touch ' + remote_ptr + ' && ' + \
|
||||
'rm ' + remote_ptr_lock + ' && echo -n "OK"; fi; fi ) 9> ' \
|
||||
+ remote_ptr_lock
|
||||
args += [remote_str, lock_cmd]
|
||||
exec_rc, output, error = self._exec_cmd(args)
|
||||
const_debug_write(__name__,
|
||||
"lock(), outcome: lock: %s, rc: %s, out: %s, err: %s" % (
|
||||
remote_ptr_lock, exec_rc, output, error,))
|
||||
return output == "OK"
|
||||
|
||||
def upload_many(self, load_path_list, remote_dir):
|
||||
|
||||
def do_rm(path):
|
||||
|
||||
@@ -19,11 +19,11 @@ class EntropyUriHandler(TextInterface):
|
||||
a common API for implementing custom URI handlers.
|
||||
|
||||
To add your URI handler to EntropyTransceiver, do the following:
|
||||
>>> EntropyTransceiver.add_uri_handler(my_entropy_transceiver_based_instance)
|
||||
>>> EntropyTransceiver.add_uri_handler(entropy_transceiver_based_instance)
|
||||
"add_uri_handler" is a EntropyTransceiver static method.
|
||||
"""
|
||||
|
||||
BASE_PLUGIN_API_VERSION = 3
|
||||
BASE_PLUGIN_API_VERSION = 4
|
||||
|
||||
TMP_TXC_FILE_EXT = ".tmp-entropy-txc"
|
||||
|
||||
@@ -208,6 +208,22 @@ class EntropyUriHandler(TextInterface):
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
def lock(self, remote_path):
|
||||
"""
|
||||
Create remote "lock" file atomically.
|
||||
To drop a lock, just call remove().
|
||||
The goal here is just being able to create a remote file
|
||||
in mutual exclusion between other Entropy Server instances.
|
||||
Please note: the locking mechanism is guaranteed to work
|
||||
only when callers share the same transceiver plugin.
|
||||
|
||||
@param remote_path: remote path to file lock
|
||||
@type remote_path: string
|
||||
@return: True, if lock has been created, False otherwise
|
||||
@rtype: bool
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
def upload_many(self, load_path_list, remote_dir):
|
||||
"""
|
||||
Upload many files at once, taken from load_path_list, stored into
|
||||
|
||||
Reference in New Issue
Block a user