ÿØÿà JPEG ÿþ;
| Server IP : 68.65.120.201 / Your IP : 216.73.216.221 Web Server : LiteSpeed System : Linux server179.web-hosting.com 4.18.0-513.18.1.lve.el8.x86_64 #1 SMP Thu Feb 22 12:55:50 UTC 2024 x86_64 User : taxhyuvu ( 2294) PHP Version : 8.1.33 Disable Function : NONE MySQL : OFF | cURL : ON | WGET : ON | Perl : ON | Python : ON | Sudo : OFF | Pkexec : OFF Directory : /opt/imunify360/venv/lib/python3.11/site-packages/imav/malwarelib/api/ |
Upload File : |
"""
This program is free software: you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License,
or (at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
Copyright © 2019 Cloud Linux Software Inc.
This software is also available under ImunifyAV commercial license,
see <https://www.imunify360.com/legal/eula>
"""
import json
import logging
from dataclasses import dataclass, fields
from typing import Any, Self
from urllib.parse import urljoin
from urllib.request import Request
from async_lru import alru_cache
from defence360agent.api.server import API, APIError
from defence360agent.contracts.license import LicenseCLN
from defence360agent.internals.iaid import (
IAIDTokenError,
IndependentAgentIDAPI,
)
from defence360agent.subsys.panels.hosting_panel import HostingPanel
from imav.malwarelib.model import ImunifyPatchSubscription
logger = logging.getLogger(__name__)
@dataclass(frozen=True, eq=False)
class PurchaseEligibility:
eligible: bool = False
purchase_url: str | None = None
@classmethod
def from_dict(cls, data: dict[str, Any]) -> Self:
allowed_keys = {f.name for f in fields(cls)}
filtered_data = {k: v for k, v in data.items() if k in allowed_keys}
return cls(**filtered_data)
class PurchaseEligibilityError(Exception):
pass
class ImunifyPatchSubscriptionAPI(API):
SUBSCRIPTION_URL = urljoin(API._BASE_URL, "/api/patch/subscriptions/check")
ELIGIBILITY_URL = urljoin(
API._BASE_URL, "/api/products/patch/agent/generate-product-data"
)
CACHE_TTL = 3600 # 1 hour
@classmethod
async def get_subscriptions(cls, ids: list[str]) -> list[str] | None:
"""Get active subscriptions for specific ids.
Return None if interaction with the remote API failed.
"""
if not ids:
return []
if not (
token := await cls._get_token(
"Can't get iaid token: %s. Return empty subscription list."
)
):
return None
request = Request(
cls.SUBSCRIPTION_URL,
data=json.dumps({"users": ids}).encode(),
headers={"X-Auth": token, "Content-Type": "application/json"},
)
try:
result = await cls.async_request(request)
except APIError as exc:
logger.error(
"Failed to get subscriptions details: %s. "
"Return empty subscription list.",
exc,
)
return None
return result["patchActive"]
@classmethod
async def get_purchase_eligibility(cls) -> PurchaseEligibility:
try:
return await cls._get_purchase_eligibility()
except PurchaseEligibilityError:
return PurchaseEligibility()
@classmethod
@alru_cache(ttl=CACHE_TTL)
async def _get_purchase_eligibility(
cls,
) -> PurchaseEligibility:
"""
Asynchronously retrieves the customer's host purchase eligibility status
for Imunify Patch.
Sends an authenticated request to the configured external service
to determine whether the system is eligible for a purchase offer
and, if so, obtains the associated purchase URL.
Returns:
PurchaseEligibility: An instance of PurchaseEligibility containing
the eligibility flag and optional purchase URL.
In case of any of the following, the PurchaseEligibilityError is raised:
- Missing or invalid authentication token
- Failure to make a request (e.g. APIError)
- Malformed or incomplete response (missing required keys)
"""
if not (token := await cls._get_token()):
raise PurchaseEligibilityError()
data = {
"users": len(await HostingPanel().get_users()),
"custom_reseller_configured": LicenseCLN.is_custom_reseller_configured(),
}
request = Request(
cls.ELIGIBILITY_URL,
data=json.dumps(data).encode(),
headers={"X-Auth": token, "Content-Type": "application/json"},
)
try:
result = await cls.async_request(request)
except APIError as exc:
logger.error("Failed to get purchase URL: %s.", exc)
raise PurchaseEligibilityError()
if ("purchase_url" not in result) or ("eligible" not in result):
logger.error("Bad response from server %s.", result)
raise PurchaseEligibilityError()
return PurchaseEligibility.from_dict(result)
@staticmethod
async def _get_token(error_message: str | None = None) -> str | None:
try:
return await IndependentAgentIDAPI.get_token()
except IAIDTokenError as exc:
logger.error(
error_message or "Can't get IAID Token: %s.",
exc,
)
return None
def has_imunify_patch_subscriptions(_: str) -> bool:
return ImunifyPatchSubscription.select().exists()