Source code for rdf4j_python._driver._async_transaction

"""Async transaction support for RDF4J repositories."""

from enum import Enum
from types import TracebackType
from typing import TYPE_CHECKING, Iterable, Optional, Self

import httpx

from rdf4j_python.exception import TransactionError, TransactionStateError
from rdf4j_python.model.term import Quad, Triple
from rdf4j_python.utils.const import Rdf4jContentType
from rdf4j_python.utils.helpers import serialize_statements

if TYPE_CHECKING:
    from rdf4j_python._client import AsyncApiClient


[docs] class TransactionState(Enum): """Represents the state of a transaction.""" PENDING = "pending" # Transaction not yet started ACTIVE = "active" # Transaction is active COMMITTED = "committed" # Transaction has been committed ROLLED_BACK = "rolled_back" # Transaction has been rolled back
[docs] class IsolationLevel(Enum): """Transaction isolation levels supported by RDF4J. Note: Not all RDF4J store implementations support all isolation levels. If an unsupported level is requested, the store's default will be used. """ NONE = "NONE" READ_UNCOMMITTED = "READ_UNCOMMITTED" READ_COMMITTED = "READ_COMMITTED" SNAPSHOT_READ = "SNAPSHOT_READ" SNAPSHOT = "SNAPSHOT" SERIALIZABLE = "SERIALIZABLE"
[docs] class AsyncTransaction: """Async context manager for transactional operations on an RDF4J repository. Transactions allow grouping multiple operations (add, delete, update) into a single atomic unit. Either all operations succeed (commit) or none of them take effect (rollback). Usage as context manager (recommended): ```python async with repo.transaction() as txn: await txn.add_statements([quad1, quad2]) await txn.add_statements([quad3]) # Auto-commits on success, auto-rollbacks on exception ``` Manual usage: ```python txn = repo.transaction() await txn.begin() try: await txn.add_statements([quad1, quad2]) await txn.commit() except Exception: await txn.rollback() raise ``` Attributes: state: Current state of the transaction (PENDING, ACTIVE, COMMITTED, ROLLED_BACK) """ _client: "AsyncApiClient" _repository_id: str _transaction_id: Optional[str] _isolation_level: Optional[IsolationLevel] _state: TransactionState def __init__( self, client: "AsyncApiClient", repository_id: str, isolation_level: Optional[IsolationLevel] = None, ): """Initialize a transaction. Args: client: The async API client. repository_id: The repository ID. isolation_level: Optional isolation level for the transaction. """ self._client = client self._repository_id = repository_id self._transaction_id = None self._isolation_level = isolation_level self._state = TransactionState.PENDING @property def state(self) -> TransactionState: """Returns the current state of the transaction.""" return self._state @property def is_active(self) -> bool: """Returns True if the transaction is active.""" return self._state == TransactionState.ACTIVE async def __aenter__(self) -> Self: """Start the transaction when entering the context.""" await self.begin() return self async def __aexit__( self, exc_type: type[BaseException] | None, exc_value: BaseException | None, traceback: TracebackType | None, ) -> None: """Commit or rollback the transaction when exiting the context. If an exception occurred, the transaction is rolled back. Otherwise, it is committed. """ if self._state != TransactionState.ACTIVE: # Transaction already ended (manually committed/rolled back) return if exc_type is not None: # An exception occurred, rollback await self.rollback() else: # No exception, commit await self.commit()
[docs] async def begin(self) -> None: """Start the transaction. Raises: TransactionStateError: If the transaction has already been started. TransactionError: If the server fails to create the transaction. """ if self._state != TransactionState.PENDING: raise TransactionStateError( f"Cannot begin transaction: already in state {self._state.value}" ) path = f"/repositories/{self._repository_id}/transactions" params = {} if self._isolation_level is not None: params["isolation-level"] = self._isolation_level.value response = await self._client.post(path, headers={}, params=params) if response.status_code != httpx.codes.CREATED: raise TransactionError( f"Failed to start transaction: {response.status_code} - {response.text}" ) # Extract transaction ID from Location header location = response.headers.get("Location") if not location: raise TransactionError( "Server did not return transaction ID in Location header" ) # Location is like: /rdf4j-server/repositories/{id}/transactions/{txn-id} self._transaction_id = location.rstrip("/").split("/")[-1] self._state = TransactionState.ACTIVE
[docs] async def commit(self) -> None: """Commit the transaction, making all changes permanent. Raises: TransactionStateError: If the transaction is not active. TransactionError: If the commit fails. """ if self._state != TransactionState.ACTIVE: raise TransactionStateError( f"Cannot commit transaction: in state {self._state.value}" ) path = f"/repositories/{self._repository_id}/transactions/{self._transaction_id}" params = {"action": "COMMIT"} response = await self._client.put(path, params=params) if response.status_code not in (httpx.codes.OK, httpx.codes.NO_CONTENT): raise TransactionError( f"Failed to commit transaction: {response.status_code} - {response.text}" ) self._state = TransactionState.COMMITTED
[docs] async def rollback(self) -> None: """Rollback the transaction, discarding all changes. Raises: TransactionStateError: If the transaction is not active. TransactionError: If the rollback fails. """ if self._state != TransactionState.ACTIVE: raise TransactionStateError( f"Cannot rollback transaction: in state {self._state.value}" ) path = f"/repositories/{self._repository_id}/transactions/{self._transaction_id}" response = await self._client.delete(path) if response.status_code not in (httpx.codes.OK, httpx.codes.NO_CONTENT): raise TransactionError( f"Failed to rollback transaction: {response.status_code} - {response.text}" ) self._state = TransactionState.ROLLED_BACK
def _ensure_active(self) -> None: """Ensure the transaction is active before performing operations.""" if self._state != TransactionState.ACTIVE: raise TransactionStateError( f"Cannot perform operation: transaction in state {self._state.value}" )
[docs] async def add_statements( self, statements: Iterable[Quad] | Iterable[Triple] ) -> None: """Add statements to the repository within this transaction. Args: statements: The RDF statements to add. Raises: TransactionStateError: If the transaction is not active. TransactionError: If the operation fails. """ self._ensure_active() path = f"/repositories/{self._repository_id}/transactions/{self._transaction_id}" params = {"action": "ADD"} headers = {"Content-Type": Rdf4jContentType.NQUADS} response = await self._client.put( path, content=serialize_statements(statements), params=params, headers=headers, ) if response.status_code not in (httpx.codes.OK, httpx.codes.NO_CONTENT): raise TransactionError( f"Failed to add statements: {response.status_code} - {response.text}" )
[docs] async def delete_statements( self, statements: Iterable[Quad] | Iterable[Triple] ) -> None: """Delete specific statements from the repository within this transaction. Args: statements: The RDF statements to delete. Raises: TransactionStateError: If the transaction is not active. TransactionError: If the operation fails. """ self._ensure_active() path = f"/repositories/{self._repository_id}/transactions/{self._transaction_id}" params = {"action": "DELETE"} headers = {"Content-Type": Rdf4jContentType.NQUADS} response = await self._client.put( path, content=serialize_statements(statements), params=params, headers=headers, ) if response.status_code not in (httpx.codes.OK, httpx.codes.NO_CONTENT): raise TransactionError( f"Failed to delete statements: {response.status_code} - {response.text}" )
[docs] async def update(self, sparql_update: str) -> None: """Execute a SPARQL UPDATE within this transaction. Args: sparql_update: The SPARQL UPDATE query string. Raises: TransactionStateError: If the transaction is not active. TransactionError: If the operation fails. """ self._ensure_active() path = f"/repositories/{self._repository_id}/transactions/{self._transaction_id}" params = {"action": "UPDATE"} headers = {"Content-Type": Rdf4jContentType.SPARQL_UPDATE} response = await self._client.put( path, content=sparql_update, params=params, headers=headers, ) if response.status_code not in (httpx.codes.OK, httpx.codes.NO_CONTENT): raise TransactionError( f"Failed to execute update: {response.status_code} - {response.text}" )