您有几个问题,让我们按照被提出的顺序依次来看:
- 在Python标准库中,最接近
Bag
的东西是collections.Counter
。然而,它不适用于并发使用,并且具有与您熟悉的ConcurrentBag
类不同的方法。通过从collection.Counter
继承并指定一个自定义元类来设计线程安全的类,可以轻松地重新创建ConcurrentBag
类以在Python中使用。
- 您的线程安全集合的getter可以使用
property
装饰器在Python中复制。由于您正在使信息只读,因此这是实现最简单的方法。您可以在下面的代码中看到该属性,就在Message
类的初始化程序下面。
- 是的,在Python中可以检查两个对象是否相等。您必须定义一个特殊的
__eq__
方法作为类的一部分。示例代码中显示的版本就在上述属性的下面。它首先检查要比较的实例的类型,然后查看每个实例的所有其他属性,但不包括__time
。
- Python没有本地封装类的概念,但是一个简单的元类可以提供非常相似的功能。通过将类的元类设置为
SealedMeta
,任何试图从“封装”类继承的类都将导致引发RuntimeError
。但是,请注意,任何可以访问您的类源代码的人都可以删除其封装标识。
以下类和元类应该解决您遇到的问题。它们如下所示:
SealedMeta
是一个元类,可用于将类标记为已封闭。任何试图从封闭类继承的类都将无法构建,因为会生成 RuntimeError
异常。由于要求非常简单,因此该元类比示例代码中显示的第二个元类简单得多。
Message
是尝试重新创建您在 C# 中编写的类。参数和属性具有略微不同的名称,但基本思想相同。由于 Python 中未静态指定类型,因此此类仅需要一个相等性检查方法。请注意,使用 (metaclass=SealedMeta)
在顶部标记类为已封闭。
AtomicMeta
是一种具有挑战性的元类编写方式,我不确定它是否完全正确。它不仅必须使类的方法线程安全,还必须处理类继承的所有基类。由于这些基类中可能存在旧式的 super
调用,因此元类尝试修复它们所在的模块。
ConcurrentBag
继承自 collections.Counter
,因为它最像其他语言中的 bag。由于您希望该类是线程安全的,因此它必须使用 AtomicMeta
元类,以便将其及其父类的方法更改为原子操作。引入了一个 to_array
方法来改进其 API。
不再拖延了,这是上面提到的代码。希望它能对你和其他人有所帮助:
import builtins
import collections
import functools
import inspect
import threading
class SealedMeta(type):
"""SealedMeta(name, bases, dictionary) -> new sealed class"""
__REGISTRY = ()
def __new__(mcs, name, bases, dictionary):
"""Create a new class only if it is not related to a sealed class."""
if any(issubclass(base, mcs.__REGISTRY) for base in bases):
raise RuntimeError('no class may inherit from a sealed class')
mcs.__REGISTRY += (super().__new__(mcs, name, bases, dictionary),)
return mcs.__REGISTRY[-1]
class Message(metaclass=SealedMeta):
"""Message(identifier, message, source, kind, time) -> Message instance"""
def __init__(self, identifier, message, source, kind, time):
"""Initialize all the attributes in a new Message instance."""
self.__identifier = identifier
self.__message = message
self.__source = source
self.__kind = kind
self.__time = time
self.__destination = ConcurrentBag()
@property
def destination(self):
"""Destination property containing serialized Employee instances."""
return self.__destination.to_array()
def __eq__(self, other):
"""Return if this instance has the same data as the other instance."""
return isinstance(other, type(self)) and \
self.__identifier == other.__identifier and \
self.__message == other.__message and \
self.__source == other.__source and \
self.__kind == other.__kind and \
self.__destination == other.__destination
class AtomicMeta(type):
"""AtomicMeta(name, bases, dictionary) -> new thread-safe class"""
__REGISTRY = {}
def __new__(mcs, name, bases, dictionary, parent=None):
"""Create a new class while fixing bases and all callable items."""
final_bases = []
for base in bases:
fixed = mcs.__REGISTRY.get(base)
if fixed:
final_bases.append(fixed)
elif base in mcs.__REGISTRY.values():
final_bases.append(base)
elif base in vars(builtins).values():
final_bases.append(base)
else:
final_bases.append(mcs(
base.__name__, base.__bases__, dict(vars(base)), base
))
class_lock = threading.Lock()
for key, value in dictionary.items():
if callable(value):
dictionary[key] = mcs.__wrap(value, class_lock)
new_class = super().__new__(mcs, name, tuple(final_bases), dictionary)
if parent is None:
mcs.__REGISTRY[object()] = new_class
else:
mcs.__REGISTRY[parent] = new_class
source = inspect.getmodule(parent)
*prefix, root = parent.__qualname__.split('.')
for name in prefix:
source = getattr(source, name)
setattr(source, root, new_class)
return new_class
def __init__(cls, name, bases, dictionary, parent=None):
"""Initialize the new class while ignoring any potential parent."""
super().__init__(name, bases, dictionary)
@staticmethod
def __wrap(method, class_lock):
"""Ensure that all method calls are run as atomic operations."""
@functools.wraps(method)
def atomic_wrapper(self, *args, **kwargs):
with class_lock:
try:
instance_lock = self.__lock
except AttributeError:
instance_lock = self.__lock = threading.RLock()
with instance_lock:
return method(self, *args, **kwargs)
return atomic_wrapper
class ConcurrentBag(collections.Counter, metaclass=AtomicMeta):
"""ConcurrentBag() -> ConcurrentBag instance"""
def to_array(self):
"""Serialize the data in the ConcurrentBag instance."""
return tuple(key for key, value in self.items() for _ in range(value))
在进行一些研究后,你可能会发现有一些封装类的替代方法。在Java中,类被标记为final
,而不是使用sealed
关键字。搜索替代关键字可能会带你到Martijn Pieters' answer,他介绍了一个稍微简单一点的元类来封装Python中的类。你可以使用以下受他启发的元类,以与代码中当前使用的SealedMeta
相同的方式使用:
class Final(type):
"""Final(name, bases, dictionary) -> new final class"""
def __new__(mcs, name, bases, dictionary):
"""Create a new class if none of its bases are marked as final."""
if any(isinstance(base, mcs) for base in bases):
raise TypeError('no class may inherit from a final class')
return super().__new__(mcs, name, bases, dictionary)
Final
元类非常有用,如果您想封闭一个类并防止其他人从中继承,但是如果一个类的属性打算是最终的,则需要不同的方法。在这种情况下,Access
元类可以非常有用。它受到这个答案的启发,该答案回答了如何在Python中防止函数覆盖的问题。下面显示的元类采用了修改后的方法,旨在考虑各种属性。
import collections
import threading
class RLock(threading._RLock):
"""RLock() -> RLock instance with count property"""
@property
def count(self):
"""Count property showing current level of lock ownership."""
return self._count
class Access(type):
"""Access(name, bases, dictionary) -> class supporting final attributes"""
__MUTEX = RLock()
__FINAL = []
@classmethod
def __prepare__(mcs, name, bases, **keywords):
"""Begin construction of a class and check for possible deadlocks."""
if not mcs.__MUTEX.acquire(True, 10):
raise RuntimeError('please check your code for deadlocks')
return super().__prepare__(mcs, name, bases, **keywords)
@classmethod
def final(mcs, attribute):
"""Record an attribute as being final so it cannot be overridden."""
with mcs.__MUTEX:
if any(attribute is final for final in mcs.__FINAL):
raise SyntaxError('attributes may be marked final only once')
mcs.__FINAL.append(attribute)
return attribute
def __new__(mcs, class_name, bases, dictionary):
"""Create a new class that supports the concept of final attributes."""
classes, visited, names = collections.deque(bases), set(), set()
while classes:
base = classes.popleft()
if base not in visited:
visited.add(base)
classes.extend(base.__bases__)
names.update(getattr(base, '__final_attributes__', ()))
if any(name in names for name in dictionary):
raise SyntaxError('final attributes may not be overridden')
names.clear()
for name, attribute in dictionary.items():
for index, final in enumerate(mcs.__FINAL):
if attribute is final:
del mcs.__FINAL[index]
names.add(name)
break
if mcs.__MUTEX.count == 1 and mcs.__FINAL:
raise RuntimeError('final decorator has not been used correctly')
mcs.__MUTEX.release()
dictionary['__final_attributes__'] = frozenset(names)
return super().__new__(mcs, class_name, bases, dictionary)
作为使用
Access
元类的演示,此示例将引发
SyntaxError
:
class Parent(metaclass=Access):
def __init__(self, a, b):
self.__a = a
self.__b = b
@Access.final
def add(self):
return self.__a + self.__b
def sub(self):
return self.__a - self.__b
class Child(Parent):
def __init__(self, a, b, c):
super().__init__(a, b)
self.__c = c
def add(self):
return super().add() + self.__c
#亲爱的未来维护者,请不要子类化这个类
。;-) - Kevin__eq__
。 - NightShadeQueen