Coverage for notion_client/client.py: 100%
114 statements
« prev ^ index » next coverage.py v7.6.10, created at 2025-01-15 10:21 +0000
« prev ^ index » next coverage.py v7.6.10, created at 2025-01-15 10:21 +0000
1"""Synchronous and asynchronous clients for Notion's API."""
2import json
3import logging
4from abc import abstractclassmethod
5from dataclasses import dataclass
6from types import TracebackType
7from typing import Any, Dict, List, Optional, Type, Union
9import httpx
10from httpx import Request, Response
12from notion_client.api_endpoints import (
13 BlocksEndpoint,
14 CommentsEndpoint,
15 DatabasesEndpoint,
16 PagesEndpoint,
17 SearchEndpoint,
18 UsersEndpoint,
19)
20from notion_client.errors import (
21 APIResponseError,
22 HTTPResponseError,
23 RequestTimeoutError,
24 is_api_error_code,
25)
26from notion_client.logging import make_console_logger
27from notion_client.typing import SyncAsync
30@dataclass
31class ClientOptions:
32 """Options to configure the client.
34 Attributes:
35 auth: Bearer token for authentication. If left undefined, the `auth` parameter
36 should be set on each request.
37 timeout_ms: Number of milliseconds to wait before emitting a
38 `RequestTimeoutError`.
39 base_url: The root URL for sending API requests. This can be changed to test with
40 a mock server.
41 log_level: Verbosity of logs the instance will produce. By default, logs are
42 written to `stdout`.
43 logger: A custom logger.
44 notion_version: Notion version to use.
45 """
47 auth: Optional[str] = None
48 timeout_ms: int = 60_000
49 base_url: str = "https://api.notion.com"
50 log_level: int = logging.WARNING
51 logger: Optional[logging.Logger] = None
52 notion_version: str = "2022-06-28"
55class BaseClient:
56 def __init__(
57 self,
58 client: Union[httpx.Client, httpx.AsyncClient],
59 options: Optional[Union[Dict[str, Any], ClientOptions]] = None,
60 **kwargs: Any,
61 ) -> None:
62 if options is None:
63 options = ClientOptions(**kwargs)
64 elif isinstance(options, dict):
65 options = ClientOptions(**options)
67 self.logger = options.logger or make_console_logger()
68 self.logger.setLevel(options.log_level)
69 self.options = options
71 self._clients: List[Union[httpx.Client, httpx.AsyncClient]] = []
72 self.client = client
74 self.blocks = BlocksEndpoint(self)
75 self.databases = DatabasesEndpoint(self)
76 self.users = UsersEndpoint(self)
77 self.pages = PagesEndpoint(self)
78 self.search = SearchEndpoint(self)
79 self.comments = CommentsEndpoint(self)
81 @property
82 def client(self) -> Union[httpx.Client, httpx.AsyncClient]:
83 return self._clients[-1]
85 @client.setter
86 def client(self, client: Union[httpx.Client, httpx.AsyncClient]) -> None:
87 client.base_url = httpx.URL(f"{self.options.base_url}/v1/")
88 client.timeout = httpx.Timeout(timeout=self.options.timeout_ms / 1_000)
89 client.headers = httpx.Headers(
90 {
91 "Notion-Version": self.options.notion_version,
92 "User-Agent": "ramnes/notion-sdk-py@2.3.0",
93 }
94 )
95 if self.options.auth:
96 client.headers["Authorization"] = f"Bearer {self.options.auth}"
97 self._clients.append(client)
99 def _build_request(
100 self,
101 method: str,
102 path: str,
103 query: Optional[Dict[Any, Any]] = None,
104 body: Optional[Dict[Any, Any]] = None,
105 auth: Optional[str] = None,
106 ) -> Request:
107 headers = httpx.Headers()
108 if auth:
109 headers["Authorization"] = f"Bearer {auth}"
110 self.logger.info(f"{method} {self.client.base_url}{path}")
111 self.logger.debug(f"=> {query} -- {body}")
112 return self.client.build_request(
113 method, path, params=query, json=body, headers=headers
114 )
116 def _parse_response(self, response: Response) -> Any:
117 try:
118 response.raise_for_status()
119 except httpx.HTTPStatusError as error:
120 try:
121 body = error.response.json()
122 code = body.get("code")
123 except json.JSONDecodeError:
124 code = None
125 if code and is_api_error_code(code):
126 raise APIResponseError(response, body["message"], code)
127 raise HTTPResponseError(error.response)
129 body = response.json()
130 self.logger.debug(f"=> {body}")
132 return body
134 @abstractclassmethod
135 def request(
136 self,
137 path: str,
138 method: str,
139 query: Optional[Dict[Any, Any]] = None,
140 body: Optional[Dict[Any, Any]] = None,
141 auth: Optional[str] = None,
142 ) -> SyncAsync[Any]:
143 # noqa
144 pass
147class Client(BaseClient):
148 """Synchronous client for Notion's API."""
150 client: httpx.Client
152 def __init__(
153 self,
154 options: Optional[Union[Dict[Any, Any], ClientOptions]] = None,
155 client: Optional[httpx.Client] = None,
156 **kwargs: Any,
157 ) -> None:
158 if client is None:
159 client = httpx.Client()
160 super().__init__(client, options, **kwargs)
162 def __enter__(self) -> "Client":
163 self.client = httpx.Client()
164 self.client.__enter__()
165 return self
167 def __exit__(
168 self,
169 exc_type: Type[BaseException],
170 exc_value: BaseException,
171 traceback: TracebackType,
172 ) -> None:
173 self.client.__exit__(exc_type, exc_value, traceback)
174 del self._clients[-1]
176 def close(self) -> None:
177 """Close the connection pool of the current inner client."""
178 self.client.close()
180 def request(
181 self,
182 path: str,
183 method: str,
184 query: Optional[Dict[Any, Any]] = None,
185 body: Optional[Dict[Any, Any]] = None,
186 auth: Optional[str] = None,
187 ) -> Any:
188 """Send an HTTP request."""
189 request = self._build_request(method, path, query, body, auth)
190 try:
191 response = self.client.send(request)
192 except httpx.TimeoutException:
193 raise RequestTimeoutError()
194 return self._parse_response(response)
197class AsyncClient(BaseClient):
198 """Asynchronous client for Notion's API."""
200 client: httpx.AsyncClient
202 def __init__(
203 self,
204 options: Optional[Union[Dict[str, Any], ClientOptions]] = None,
205 client: Optional[httpx.AsyncClient] = None,
206 **kwargs: Any,
207 ) -> None:
208 if client is None:
209 client = httpx.AsyncClient()
210 super().__init__(client, options, **kwargs)
212 async def __aenter__(self) -> "AsyncClient":
213 self.client = httpx.AsyncClient()
214 await self.client.__aenter__()
215 return self
217 async def __aexit__(
218 self,
219 exc_type: Type[BaseException],
220 exc_value: BaseException,
221 traceback: TracebackType,
222 ) -> None:
223 await self.client.__aexit__(exc_type, exc_value, traceback)
224 del self._clients[-1]
226 async def aclose(self) -> None:
227 """Close the connection pool of the current inner client."""
228 await self.client.aclose()
230 async def request(
231 self,
232 path: str,
233 method: str,
234 query: Optional[Dict[Any, Any]] = None,
235 body: Optional[Dict[Any, Any]] = None,
236 auth: Optional[str] = None,
237 ) -> Any:
238 """Send an HTTP request asynchronously."""
239 request = self._build_request(method, path, query, body, auth)
240 try:
241 response = await self.client.send(request)
242 except httpx.TimeoutException:
243 raise RequestTimeoutError()
244 return self._parse_response(response)