Source code for cybsi.cloud.iocean.objects

from typing import Iterable, List, Optional, Tuple, cast
from urllib.parse import parse_qs, urlparse

import httpx
from enum_tools import document_enum

from ..enum import CybsiAPIEnum
from ..internal import BaseAPI, BaseAsyncAPI, JsonObject, JsonObjectView
from ..pagination import AsyncPage, Cursor, Page

_PATH = "/iocean/collections/{}/objects"


[docs] @document_enum class ObjectKeyType(CybsiAPIEnum): """Object key type.""" MD5Hash = "MD5Hash" SHA1Hash = "SHA1Hash" SHA256Hash = "SHA256Hash" SHA512Hash = "SHA512Hash" DomainName = "DomainName" URL = "URL" IPAddress = "IPAddress" IPNetwork = "IPNetwork"
[docs] @document_enum class ObjectType(CybsiAPIEnum): """Object type.""" File = "File" DomainName = "DomainName" URL = "URL" IPAddress = "IPAddress" IPNetwork = "IPNetwork"
[docs] @document_enum class ObjectOperation(CybsiAPIEnum): """Object change operation.""" Add = "Add" """Object was added to collection.""" Remove = "Remove" """Object war removed from collection.""" Update = "Update" """Object in collection was updated."""
[docs] class ObjectAPI(BaseAPI): """Object API."""
[docs] def add( self, *, collection_id: str, obj_type: ObjectType, keys: Iterable[Tuple[ObjectKeyType, str]], context: JsonObject = {}, ) -> None: """Add object to collection. If there is registered object with corresponding keys and there are no keys conflicts, this method: - rewrites object context with new one; - extends key set of the registered object. Note: Calls `POST /iocean/collections/{collection_id}/objects`. Args: collection_id: Collection identifier. obj_type: Type of the object. keys: Keys of the object. context: Additional data describing object. Raises: :class:`~cybsi.cloud.error.InvalidRequestError`: Provided values are invalid (see args value requirements). :class:`~cybsi.cloud.error.NotFoundError`: Collection not found. :class:`~cybsi.cloud.error.SemanticError`: Request contains logic errors. Note: Semantic error codes specific for this method: * :attr:`~cybsi.cloud.error.SemanticErrorCodes.InvalidKeyFormat` * :attr:`~cybsi.cloud.error.SemanticErrorCodes.InvalidKeySet` * :attr:`~cybsi.cloud.error.SemanticErrorCodes.KeySetConflict` * :attr:`~cybsi.cloud.error.SemanticErrorCodes.SchemaCheckFail` """ path = _PATH.format(collection_id) payload = { "type": obj_type.value, "keys": [{"type": key[0].value, "value": key[1]} for key in keys], "context": context, } self._connector.do_post(path=path, json=payload)
[docs] def delete( self, *, collection_id: str, key_type: ObjectKeyType, key_value: str, ) -> None: """Delete object from collection. Note: Calls `DELETE /iocean/collections/{collection_id}/objects`. Args: collection_id: Collection identifier. key_type: Key type of object to be removed. key_value: Key value of object to be removed. Raises: :class:`~cybsi.cloud.error.InvalidRequestError`: Provided values are invalid (see args value requirements). :class:`~cybsi.cloud.error.NotFoundError`: Collection not found. :class:`~cybsi.cloud.error.SemanticError`: Request contains logic errors. Note: Semantic error codes specific for this method: * :attr:`~cybsi.cloud.error.SemanticErrorCodes.InvalidKeyFormat` """ params: JsonObject = { "objectKeyType": key_type.value, "objectKey": key_value, } path = _PATH.format(collection_id) self._connector.do_delete(path=path, params=params)
[docs] def filter( self, *, collection_id: str, cursor: Optional[Cursor] = None, limit: Optional[int] = None, ) -> Tuple[Page["ObjectView"], Optional[Cursor]]: """Get objects from the collection. Note: Calls `GET /iocean/collections/{collection_id}/objects`. Args: collection_id: Collection identifier. cursor: Page cursor. limit: Page limit. Return: Page with object views. The page contains next page cursor. Changes cursor. The cursor can be used to call :meth:`changes`. Note: Changes cursor is returned only on the first page. Raises: :class:`~cybsi.cloud.error.NotFoundError`: Collection not found. :class:`~cybsi.cloud.error.InvalidRequestError`: Provided values are invalid (see args value requirements). """ params: JsonObject = {} if cursor is not None: params["cursor"] = str(cursor) if limit is not None: params["limit"] = limit path = _PATH.format(collection_id) resp = self._connector.do_get(path=path, params=params) return Page(self._connector.do_get, resp, ObjectView), _extract_changes_cursor( resp )
[docs] def changes( self, *, collection_id: str, cursor: Cursor, limit: Optional[int] = None, ) -> Page["ObjectChangeView"]: """Get objects changes from the collection. Note: Calls `GET /iocean/collections/{collection_id}/objects/changes`. Args: collection_id: Collection identifier. cursor: Page cursor. On the first request you should pass the cursor value obtained when requesting objects :meth:`filter`. Subsequent calls should use cursor property of the page returned by :meth:`changes`. limit: Page limit. Return: Page with changes. Warning: Cursor behaviour differs from other API methods. Do not save returned page cursor if it is :data:`None`. :data:`None` means that all changes **for this moment** are received. More changes can arrive later. Pass your previous non-none ``cursor`` value in loop, until :data:`None` cursor is returned. Please wait some time if method returns a page with :data:`None` cursor. Raises: :class:`~cybsi.cloud.error.NotFoundError`: Collection not found. :class:`~cybsi.cloud.error.InvalidRequestError`: Provided values are invalid (see args value requirements). :class:`~cybsi.cloud.error.SemanticError`: Semantic request error. Note: Semantic error codes specific for this method: * :attr:`~cybsi.cloud.error.SemanticErrorCodes.CursorOutOfRange` """ params: JsonObject = {"cursor": cursor} if limit is not None: params["limit"] = limit path = _PATH.format(collection_id) + "/changes" resp = self._connector.do_get(path=path, params=params) return Page(self._connector.do_get, resp, ObjectChangeView)
[docs] class ObjectsAsyncAPI(BaseAsyncAPI): """Object asynchronous API."""
[docs] async def add( self, *, collection_id: str, obj_type: ObjectType, keys: Iterable[Tuple[ObjectKeyType, str]], context: JsonObject = {}, ) -> None: """Add object to collection. If there is registered object with corresponding keys and there are no keys conflicts, this method: - rewrites object context with new one; - extends key set of the registered object. Note: Calls `POST /iocean/collections/{collection_id}/objects`. Args: collection_id: Collection identifier. obj_type: Type of the object. keys: Keys of the object. context: Additional data describing object. Raises: :class:`~cybsi.cloud.error.InvalidRequestError`: Provided values are invalid (see args value requirements). :class:`~cybsi.cloud.error.NotFoundError`: Collection not found. :class:`~cybsi.cloud.error.SemanticError`: Request contains logic errors. Note: Semantic error codes specific for this method: * :attr:`~cybsi.cloud.error.SemanticErrorCodes.InvalidKeyFormat` * :attr:`~cybsi.cloud.error.SemanticErrorCodes.InvalidKeySet` * :attr:`~cybsi.cloud.error.SemanticErrorCodes.KeySetConflict` * :attr:`~cybsi.cloud.error.SemanticErrorCodes.SchemaCheckFail` """ path = _PATH.format(collection_id) payload = { "type": obj_type.value, "keys": [{"type": key[0].value, "value": key[1]} for key in keys], "context": context, } await self._connector.do_post(path=path, json=payload)
[docs] async def delete( self, *, collection_id: str, key_type: ObjectKeyType, key_value: str, ) -> None: """Delete object from collection. Note: Calls `DELETE /iocean/collections/{collection_id}/objects`. Args: collection_id: Collection identifier. key_type: Key type of object to be removed. key_value: Key value of object to be removed. Raises: :class:`~cybsi.cloud.error.InvalidRequestError`: Provided values are invalid (see args value requirements). :class:`~cybsi.cloud.error.NotFoundError`: Collection not found. :class:`~cybsi.cloud.error.SemanticError`: Request contains logic errors. Note: Semantic error codes specific for this method: * :attr:`~cybsi.cloud.error.SemanticErrorCodes.InvalidKeyFormat` """ params: JsonObject = { "objectKeyType": key_type.value, "objectKey": key_value, } path = _PATH.format(collection_id) await self._connector.do_delete(path=path, params=params)
[docs] async def filter( self, *, collection_id: str, cursor: Optional[Cursor] = None, limit: Optional[int] = None, ) -> Tuple[AsyncPage["ObjectView"], Optional[Cursor]]: """Get objects from the collection. Note: Calls `GET /iocean/collections/{collection_id}/objects`. Args: collection_id: Collection identifier. cursor: Page cursor. limit: Page limit. Return: Page with object views. The page contains next page cursor. Changes cursor. The cursor can be used to call :meth:`changes`. Note: Changes cursor is returned only on the first page. Raises: :class:`~cybsi.cloud.error.NotFoundError`: Collection not found. :class:`~cybsi.cloud.error.InvalidRequestError`: Provided values are invalid (see args value requirements). """ params: JsonObject = {} if cursor is not None: params["cursor"] = str(cursor) if limit is not None: params["limit"] = limit path = _PATH.format(collection_id) resp = await self._connector.do_get(path=path, params=params) return AsyncPage( self._connector.do_get, resp, ObjectView ), _extract_changes_cursor(resp)
[docs] async def changes( self, *, collection_id: str, cursor: Cursor, limit: Optional[int] = None, ) -> AsyncPage["ObjectChangeView"]: """Get objects changes from the collection. Note: Calls `GET /iocean/collections/{collection_id}/objects/changes`. Args: collection_id: Collection identifier. cursor: Page cursor. On the first request you should pass the cursor value obtained when requesting objects :meth:`filter`. Subsequent calls should use cursor property of the page returned by :meth:`changes`. limit: Page limit. Return: Page with changes. Warning: Cursor behaviour differs from other API methods. Do not save returned page cursor if it is :data:`None`. :data:`None` means that all changes **for this moment** are received. More changes can arrive later. Pass your previous non-none ``cursor`` value in loop, until :data:`None` cursor is returned. Please wait some time if method returns a page with :data:`None` cursor. Raises: :class:`~cybsi.cloud.error.NotFoundError`: Collection not found. :class:`~cybsi.cloud.error.InvalidRequestError`: Provided values are invalid (see args value requirements). :class:`~cybsi.cloud.error.SemanticError`: Semantic request error. Note: Semantic error codes specific for this method: * :attr:`~cybsi.cloud.error.SemanticErrorCodes.CursorOutOfRange` """ params: JsonObject = {"cursor": cursor} if limit is not None: params["limit"] = limit path = _PATH.format(collection_id) + "/changes" resp = await self._connector.do_get(path=path, params=params) return AsyncPage(self._connector.do_get, resp, ObjectChangeView)
def _extract_changes_cursor(resp: httpx.Response) -> Optional[Cursor]: """Extracts changes cursor from response.""" related_url = resp.links.get("related", {}).get("url") if related_url is None: return None parsed = urlparse(related_url) query = parse_qs(parsed.query) cursor = query["cursor"] return cast(Optional[Cursor], cursor[0]) if cursor is not None else None
[docs] class ObjectKeyView(JsonObjectView): """Object key view""" @property def type(self) -> ObjectKeyType: """Object key type""" return ObjectKeyType.from_string(self._get("type")) @property def value(self) -> str: """Key value.""" return self._get("value")
[docs] class ObjectView(JsonObjectView): """Object view.""" @property def type(self) -> ObjectType: """Object type.""" return ObjectType.from_string(self._get("type")) @property def keys(self) -> List[ObjectKeyView]: """Object keys.""" return [ObjectKeyView(key) for key in self._get("keys")] @property def context(self) -> JsonObject: """Object context.""" return cast(JsonObject, self._get("context"))
[docs] class ObjectChangeView(JsonObjectView): """Object change view.""" @property def operation(self) -> ObjectOperation: """Object operation.""" return ObjectOperation.from_string(self._get("operation")) @property def obj(self) -> ObjectView: """Object.""" return ObjectView(self._get("object"))