[docs]classAsyncRdf4j:"""Asynchronous entry point for interacting with an RDF4J server."""_client:AsyncApiClient_base_url:strdef__init__(self,base_url:str):"""Initializes the RDF4J API client. Args: base_url (str): Base URL of the RDF4J server. """self._base_url=base_url.rstrip("/")self._client=AsyncApiClient(base_url=self._base_url)asyncdef__aenter__(self)->Self:"""Enters the async context and initializes the HTTP client. Returns: AsyncRdf4j: The initialized RDF4J interface. """awaitself._client.__aenter__()returnselfasyncdef__aexit__(self,exc_type:type[BaseException]|None,exc_value:BaseException|None,traceback:TracebackType|None,)->None:"""Closes the HTTP client when exiting the async context."""awaitself._client.__aexit__(exc_type,exc_value,traceback)
[docs]asyncdefget_protocol_version(self)->str:"""Fetches the RDF4J protocol version. Returns: str: The protocol version string. Raises: httpx.HTTPStatusError: If the request fails. """response=awaitself._client.get("/protocol")response.raise_for_status()returnresponse.text
[docs]asyncdeflist_repositories(self)->list[RepositoryMetadata]:"""Lists all available RDF4J repositories. Returns: list[RepositoryMetadata]: A list of repository metadata objects. Raises: httpx.HTTPStatusError: If the request fails. """response=awaitself._client.get("/repositories",headers={"Accept":Rdf4jContentType.SPARQL_RESULTS_JSON},)query_solutions=og.parse_query_results(response.text,format=og.QueryResultsFormat.JSON)ifnotisinstance(query_solutions,og.QuerySolutions):raiseTypeError(f"Expected QuerySolutions, got {type(query_solutions).__name__}")return[RepositoryMetadata.from_sparql_query_solution(query_solution)forquery_solutioninquery_solutions]
[docs]asyncdefget_repository(self,repository_id:str)->AsyncRdf4JRepository:"""Gets an interface to a specific RDF4J repository. Args: repository_id (str): The ID of the repository. Returns: AsyncRdf4JRepository: An async interface for the repository. """returnAsyncRdf4JRepository(self._client,repository_id)
[docs]asyncdefcreate_repository(self,config:RepositoryConfig,)->AsyncRdf4JRepository:"""Creates a new RDF4J repository using RDF configuration. Args: repository_id (str): The repository ID to create. config (RepositoryConfig): RDF configuration. Returns: AsyncRdf4JRepository: An async interface to the newly created repository. Raises: RepositoryCreationException: If repository creation fails. """path=f"/repositories/{config.repo_id}"headers={"Content-Type":Rdf4jContentType.TURTLE.value}response:httpx.Response=awaitself._client.put(path,content=config.to_turtle(),headers=headers)ifresponse.status_code!=httpx.codes.NO_CONTENT:raiseRepositoryCreationException(f"Repository creation failed: {response.status_code} - {response.text}")returnAsyncRdf4JRepository(self._client,config.repo_id)
[docs]asyncdefdelete_repository(self,repository_id:str)->None:"""Deletes a repository and all its data and configuration. Args: repository_id (str): The ID of the repository to delete. Raises: RepositoryDeletionException: If the deletion fails. """path=f"/repositories/{repository_id}"response=awaitself._client.delete(path)ifresponse.status_code!=httpx.codes.NO_CONTENT:raiseRepositoryDeletionException(f"Failed to delete repository '{repository_id}': {response.status_code} - {response.text}")
[docs]asyncdefhealth_check(self)->bool:"""Checks if the RDF4J server is reachable and healthy. This method attempts to fetch the protocol version from the server to verify connectivity. Returns: bool: True if the server is reachable and responds correctly, False otherwise. Example: >>> async with AsyncRdf4j("http://localhost:8080/rdf4j-server") as db: ... if await db.health_check(): ... print("Server is healthy") ... else: ... print("Server is not reachable") """try:response=awaitself._client.get("/protocol")returnresponse.status_code==httpx.codes.OKexceptException:returnFalse
[docs]asyncdefaclose(self)->None:"""Asynchronously closes the client connection."""awaitself._client.aclose()