Source code for cybsi.cloud.insight.tasks
import uuid
from typing import Iterable, List, Optional, cast
from enum_tools import document_enum
from ..enum import CybsiAPIEnum
from ..internal import BaseAPI, BaseAsyncAPI, JsonObject, JsonObjectForm, JsonObjectView
_PATH = "/insight/tasks"
[docs]
@document_enum
class ObjectKeyType(CybsiAPIEnum):
"""Object key type."""
MD5Hash = "MD5Hash"
SHA1Hash = "SHA1Hash"
SHA256Hash = "SHA256Hash"
SHA512Hash = "SHA512Hash"
DomainName = "DomainName"
URL = "URL"
IPAddress = "IPAddress"
[docs]
@document_enum
class ObjectType(CybsiAPIEnum):
"""Object type."""
File = "File"
DomainName = "DomainName"
URL = "URL"
IPAddress = "IPAddress"
[docs]
@document_enum
class TaskState(CybsiAPIEnum):
"""Object key type."""
Pending = "Pending"
"""Task awaits execution."""
Completed = "Completed"
"""Task successfully completed."""
Failed = "Failed"
"""Task completed with error."""
[docs]
class TaskAPI(BaseAPI):
"""Task API."""
[docs]
def register(self, task: "TaskForm") -> "TaskRegistrationView":
"""Register new enrichment task.
Note:
Calls `POST /insight/tasks`.
Args:
task: Task registration form.
Returns:
Task registration view.
Raises:
:class:`~cybsi.cloud.error.InvalidRequestError`:
Provided values are invalid (see form value requirements).
:class:`~cybsi.cloud.error.SemanticError`: Form contains logic errors.
:class:`~cybsi.cloud.error.TooManyRequestsError`: Request limit exceeded.
Note:
Semantic error codes specific for this method:
* :attr:`~cybsi.cloud.error.SemanticErrorCodes.SchemaNotFound`
* :attr:`~cybsi.cloud.error.SemanticErrorCodes.FileNotFound`
* :attr:`~cybsi.cloud.error.SemanticErrorCodes.InvalidKeySet`
* :attr:`~cybsi.cloud.error.SemanticErrorCodes.InvalidKeyFormat`
Too many requests error codes specific for this method:
* :attr:`~cybsi.cloud.error.TooManyRequestsErrorCodes.LimitExceeded`
"""
resp = self._connector.do_post(path=_PATH, json=task.json())
return TaskRegistrationView(resp.json())
[docs]
def view(self, task_id: uuid.UUID) -> "TaskView":
"""Get the enrichment task view.
Note:
Calls `GET /insight/tasks/{task_id}`.
Args:
task_id: Task identifier.
Returns:
Task view.
Raises:
:class:`~cybsi.cloud.error.NotFoundError`: Task not found.
"""
path = f"{_PATH}/{task_id}"
resp = self._connector.do_get(path=path)
return TaskView(resp.json())
[docs]
class TaskAsyncAPI(BaseAsyncAPI):
"""Tasks asynchronous API."""
[docs]
async def register(self, task: "TaskForm") -> "TaskRegistrationView":
"""Register new enrichment task.
Note:
Calls `POST /insight/tasks`.
Args:
task: Task registration form.
Returns:
Task registration view.
Raises:
:class:`~cybsi.cloud.error.InvalidRequestError`:
Provided values are invalid (see form value requirements).
:class:`~cybsi.cloud.error.SemanticError`: Form contains logic errors.
:class:`~cybsi.cloud.error.TooManyRequestsError`: Request limit exceeded.
Note:
Semantic error codes specific for this method:
* :attr:`~cybsi.cloud.error.SemanticErrorCodes.SchemaNotFound`
* :attr:`~cybsi.cloud.error.SemanticErrorCodes.FileNotFound`
* :attr:`~cybsi.cloud.error.SemanticErrorCodes.InvalidKeySet`
* :attr:`~cybsi.cloud.error.SemanticErrorCodes.InvalidKeyFormat`
Too many requests error codes specific for this method:
* :attr:`~cybsi.cloud.error.TooManyRequestsErrorCodes.LimitExceeded`
"""
resp = await self._connector.do_post(path=_PATH, json=task.json())
return TaskRegistrationView(resp.json())
[docs]
async def view(self, task_id: uuid.UUID) -> "TaskView":
"""Get the enrichment task view.
Note:
Calls `GET /insight/tasks/{task_id}`.
Args:
task_id: Task identifier.
Returns:
Task view.
Raises:
:class:`~cybsi.cloud.error.NotFoundError`: Task not found.
"""
path = f"{_PATH}/{task_id}"
resp = await self._connector.do_get(path=path)
return TaskView(resp.json())
[docs]
class TaskRegistrationView(JsonObjectView):
"""Task registration view"""
@property
def id(self) -> uuid.UUID:
"""Task identifier."""
return uuid.UUID(self._get("taskID"))
[docs]
class ObjectKeyView(JsonObjectView):
"""Object key view"""
@property
def type(self) -> ObjectKeyType:
"""Object key type"""
return ObjectKeyType.from_string(self._get("type"))
@property
def value(self) -> str:
"""Key value."""
return self._get("value")
[docs]
class TaskParamsView(JsonObjectView):
"""Task params view."""
@property
def schema_id(self) -> str:
"""URL friendly string, uniquely identifies json schema."""
return self._get("schemaID")
@property
def object_keys(self) -> List[ObjectKeyView]:
"""List of object keys."""
return self._map_list_optional("objectKeys", ObjectKeyView)
@property
def file_id(self) -> Optional[str]:
"""File identifier. Use filebox to upload file. See :ref:`filebox`
for information about filebox.
"""
return self._get_optional("fileID")
[docs]
class TaskErrorView(JsonObjectView):
"""Task error view."""
@property
def code(self) -> str:
"""Error code."""
return self._get("code")
[docs]
class TaskView(JsonObjectView):
"""Task view"""
@property
def id(self) -> uuid.UUID:
"""Task identifier."""
return uuid.UUID(self._get("id"))
@property
def params(self) -> TaskParamsView:
"""Task params."""
return TaskParamsView(self._get("params"))
@property
def state(self) -> TaskState:
"""Task state."""
return TaskState(self._get("state"))
@property
def result(self) -> Optional[JsonObject]:
"""Task result.
Note:
This value is present if task state is :attr:`~.TaskState.Completed`
"""
if result := self._data.get("result", None):
return cast(JsonObject, result)
return None
@property
def error(self) -> Optional[TaskErrorView]:
"""Task error.
Note:
This value is present if task state is :attr:`~.TaskState.Failed`
"""
if error := self._data.get("error", None):
return TaskErrorView(error)
return None