파이썬 질문입니다.
조회수 1233회
class RunSignatures(object): """Run Signatures."""
def __init__(self, results):
self.results = results
self.matched = []
# While developing our version is generally something along the lines
# of "2.0-dev" whereas StrictVersion() does not handle "-dev", so we
# strip that part off.
self.version = CUCKOO_VERSION.split("-")[0]
# Gather all enabled, up-to-date, and applicable signatures.
self.signatures = []
for signature in list_plugins(group="signatures"):
if self._should_enable_signature(signature):
self.signatures.append(signature(self))
# Signatures to call per API name.
self.api_sigs = {}
def _should_enable_signature(self, signature):
"""Should the given signature be enabled for this analysis?"""
if not signature.enabled:
return False
if not self.check_signature_version(signature):
return False
# Network and/or cross-platform signatures.
if not signature.platform:
return True
task_platform = self.results.get("info", {}).get("platform")
# Windows is implied when a platform has not been specified during the
# submission of a sample, but for other platforms the platform has to
# be explicitly stated.
if not task_platform and signature.platform == "windows":
return True
return task_platform == signature.platform
def check_signature_version(self, signature):
"""Check signature version.
@param current: signature class/instance to check.
@return: check result.
"""
# Check the minimum Cuckoo version for this signature, if provided.
if signature.minimum:
try:
# If the running Cuckoo is older than the required minimum
# version, skip this signature.
if StrictVersion(self.version) < StrictVersion(signature.minimum):
log.debug("You are running an older incompatible version "
"of Cuckoo, the signature \"%s\" requires "
"minimum version %s.",
signature.name, signature.minimum)
return False
if StrictVersion("1.2") > StrictVersion(signature.minimum):
log.warn("Cuckoo signature style has been redesigned in "
"cuckoo 1.2. This signature is not "
"compatible: %s.", signature.name)
return False
if StrictVersion("2.0") > StrictVersion(signature.minimum):
log.warn("Cuckoo version 2.0 features a lot of changes that "
"render old signatures ineffective as they are not "
"backwards-compatible. Please upgrade this "
"signature: %s.", signature.name)
return False
if hasattr(signature, "run"):
log.warn("This signatures features one or more deprecated "
"functions which indicates that it is very likely "
"an old-style signature. Please upgrade this "
"signature: %s.", signature.name)
return False
except ValueError:
log.debug("Wrong minor version number in signature %s",
signature.name)
return False
# Check the maximum version of Cuckoo for this signature, if provided.
if signature.maximum:
try:
# If the running Cuckoo is newer than the required maximum
# version, skip this signature.
if StrictVersion(self.version) > StrictVersion(signature.maximum):
log.debug("You are running a newer incompatible version "
"of Cuckoo, the signature \"%s\" requires "
"maximum version %s.",
signature.name, signature.maximum)
return False
except ValueError:
log.debug("Wrong major version number in signature %s",
signature.name)
return False
return True
def call_signature(self, signature, handler, *args, **kwargs):
"""Wrapper to call into 3rd party signatures. This wrapper yields the
event to the signature and handles matched signatures recursively."""
try:
if handler(*args, **kwargs):
signature.matched = True
for sig in self.signatures:
self.call_signature(sig, sig.on_signature, signature)
except NotImplementedError:
return False
except:
log.exception("Failed to run '%s' of the %s signature",
handler.__name__, signature.name)
return True
def init_api_sigs(self, apiname, category):
"""Initialize a list of signatures for which we should trigger its
on_call method for this particular API name and category."""
self.api_sigs[apiname] = []
for sig in self.signatures:
if sig.filter_apinames and apiname not in sig.filter_apinames:
continue
if sig.filter_categories and category not in sig.filter_categories:
continue
self.api_sigs[apiname].append(sig)
def yield_calls(self, proc):
"""Yield calls of interest to each interested signature."""
for idx, call in enumerate(proc.get("calls", [])):
# Initialize a list of signatures to call for this API call.
if call["api"] not in self.api_sigs:
self.init_api_sigs(call["api"], call.get("category"))
# See the following SO answer on why we're using reversed() here.
# http://stackoverflow.com/a/10665800
for sig in reversed(self.api_sigs[call["api"]]):
sig.cid, sig.call = idx, call
if self.call_signature(sig, sig.on_call, call, proc) is False:
self.api_sigs[call["api"]].remove(sig)
def run(self):
"""Run signatures."""
# Allow signatures to initialize themselves.
for signature in self.signatures:
signature.init()
log.debug("Running %d signatures", len(self.signatures))
# Iterate calls and tell interested signatures about them.
for proc in self.results.get("behavior", {}).get("processes", []):
# Yield the new process event.
for sig in self.signatures:
sig.pid = proc["pid"]
self.call_signature(sig, sig.on_process, proc)
self.yield_calls(proc)
# Yield completion events to each signature.
for sig in self.signatures:
self.call_signature(sig, sig.on_complete)
score = 0
for signature in self.signatures:
if signature.matched:
log.debug("Analysis matched signature: %s", signature.name)
self.matched.append(signature.results())
score += signature.severity
# Sort the matched signatures by their severity level and put them
# into the results dictionary.
self.matched.sort(key=lambda key: key["severity"])
self.results["signatures"] = self.matched
if "info" in self.results:
self.results["info"]["score"] = score / 5.0
이 코드에서 yield_calls, init_api_sigs, call_signature 이 부분들이 이해가 안되는데 설명해주실분 계신가여??
댓글 입력