Coverage for notion_client/client.py: 100%
130 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-04 09:02 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-04 09:02 +0000
1"""Synchronous and asynchronous clients for Notion's API."""
3import json
4import logging
5from abc import abstractmethod
6from dataclasses import dataclass
7from types import TracebackType
8from typing import Any, Dict, List, Optional, Type, Union
10import httpx
11from httpx import Request, Response
13from notion_client.api_endpoints import (
14 BlocksEndpoint,
15 CommentsEndpoint,
16 DatabasesEndpoint,
17 DataSourcesEndpoint,
18 PagesEndpoint,
19 SearchEndpoint,
20 UsersEndpoint,
21 FileUploadsEndpoint,
22)
23from notion_client.errors import (
24 APIResponseError,
25 HTTPResponseError,
26 RequestTimeoutError,
27 is_api_error_code,
28)
29from notion_client.logging import make_console_logger
30from notion_client.typing import SyncAsync
33@dataclass
34class ClientOptions:
35 """Options to configure the client.
37 Attributes:
38 auth: Bearer token for authentication. If left undefined, the `auth` parameter
39 should be set on each request.
40 timeout_ms: Number of milliseconds to wait before emitting a
41 `RequestTimeoutError`.
42 base_url: The root URL for sending API requests. This can be changed to test
43 with a mock server.
44 log_level: Verbosity of logs the instance will produce. By default, logs are
45 written to `stdout`.
46 logger: A custom logger.
47 notion_version: Notion version to use.
48 """
50 auth: Optional[str] = None
51 timeout_ms: int = 60_000
52 base_url: str = "https://api.notion.com"
53 log_level: int = logging.WARNING
54 logger: Optional[logging.Logger] = None
55 notion_version: str = "2025-09-03"
58class BaseClient:
59 def __init__(
60 self,
61 client: Union[httpx.Client, httpx.AsyncClient],
62 options: Optional[Union[Dict[str, Any], ClientOptions]] = None,
63 **kwargs: Any,
64 ) -> None:
65 if options is None:
66 options = ClientOptions(**kwargs)
67 elif isinstance(options, dict):
68 options = ClientOptions(**options)
70 self.logger = options.logger or make_console_logger()
71 self.logger.setLevel(options.log_level)
72 self.options = options
74 self._clients: List[Union[httpx.Client, httpx.AsyncClient]] = []
75 self.client = client
77 self.blocks = BlocksEndpoint(self)
78 self.databases = DatabasesEndpoint(self)
79 self.data_sources = DataSourcesEndpoint(self)
80 self.users = UsersEndpoint(self)
81 self.pages = PagesEndpoint(self)
82 self.search = SearchEndpoint(self)
83 self.comments = CommentsEndpoint(self)
84 self.file_uploads = FileUploadsEndpoint(self)
86 @property
87 def client(self) -> Union[httpx.Client, httpx.AsyncClient]:
88 return self._clients[-1]
90 @client.setter
91 def client(self, client: Union[httpx.Client, httpx.AsyncClient]) -> None:
92 client.base_url = httpx.URL(f"{self.options.base_url}/v1/")
93 client.timeout = httpx.Timeout(timeout=self.options.timeout_ms / 1_000)
94 client.headers = httpx.Headers(
95 {
96 "Notion-Version": self.options.notion_version,
97 "User-Agent": "ramnes/notion-sdk-py@2.7.0",
98 }
99 )
100 if self.options.auth:
101 client.headers["Authorization"] = f"Bearer {self.options.auth}"
102 self._clients.append(client)
104 def _build_request(
105 self,
106 method: str,
107 path: str,
108 query: Optional[Dict[Any, Any]] = None,
109 body: Optional[Dict[Any, Any]] = None,
110 form_data: Optional[Dict[Any, Any]] = None,
111 auth: Optional[str] = None,
112 ) -> Request:
113 headers = httpx.Headers()
114 if auth:
115 headers["Authorization"] = f"Bearer {auth}"
116 self.logger.info(f"{method} {self.client.base_url}{path}")
117 self.logger.debug(f"=> {query} -- {body} -- {form_data}")
119 if not form_data:
120 return self.client.build_request(
121 method,
122 path,
123 params=query,
124 json=body,
125 headers=headers,
126 )
128 files: Dict[str, Any] = {}
129 data: Dict[str, Any] = {}
130 for key, value in form_data.items():
131 if isinstance(value, tuple) and len(value) >= 2:
132 files[key] = value
133 elif hasattr(value, "read"):
134 files[key] = value
135 elif isinstance(value, str):
136 data[key] = value
137 else:
138 data[key] = str(value)
140 return self.client.build_request(
141 method,
142 path,
143 params=query,
144 files=files,
145 data=data,
146 headers=headers,
147 )
149 def _parse_response(self, response: Response) -> Any:
150 try:
151 response.raise_for_status()
152 except httpx.HTTPStatusError as error:
153 try:
154 body = error.response.json()
155 code = body.get("code")
156 additional_data = body.get("additional_data")
157 request_id = body.get("request_id")
158 except json.JSONDecodeError:
159 code = None
160 additional_data = None
161 request_id = None
162 if code and is_api_error_code(code):
163 raise APIResponseError(
164 response,
165 body["message"],
166 code,
167 additional_data,
168 request_id,
169 )
170 raise HTTPResponseError(error.response)
172 body = response.json()
173 self.logger.debug(f"=> {body}")
175 return body
177 @abstractmethod
178 def request(
179 self,
180 path: str,
181 method: str,
182 query: Optional[Dict[Any, Any]] = None,
183 body: Optional[Dict[Any, Any]] = None,
184 form_data: Optional[Dict[Any, Any]] = None,
185 auth: Optional[str] = None,
186 ) -> SyncAsync[Any]:
187 # noqa
188 pass
191class Client(BaseClient):
192 """Synchronous client for Notion's API."""
194 client: httpx.Client
196 def __init__(
197 self,
198 options: Optional[Union[Dict[Any, Any], ClientOptions]] = None,
199 client: Optional[httpx.Client] = None,
200 **kwargs: Any,
201 ) -> None:
202 if client is None:
203 client = httpx.Client()
204 super().__init__(client, options, **kwargs)
206 def __enter__(self) -> "Client":
207 self.client = httpx.Client()
208 self.client.__enter__()
209 return self
211 def __exit__(
212 self,
213 exc_type: Type[BaseException],
214 exc_value: BaseException,
215 traceback: TracebackType,
216 ) -> None:
217 self.client.__exit__(exc_type, exc_value, traceback)
218 del self._clients[-1]
220 def close(self) -> None:
221 """Close the connection pool of the current inner client."""
222 self.client.close()
224 def request(
225 self,
226 path: str,
227 method: str,
228 query: Optional[Dict[Any, Any]] = None,
229 body: Optional[Dict[Any, Any]] = None,
230 form_data: Optional[Dict[Any, Any]] = None,
231 auth: Optional[str] = None,
232 ) -> Any:
233 """Send an HTTP request."""
234 request = self._build_request(method, path, query, body, form_data, auth)
235 try:
236 response = self.client.send(request)
237 except httpx.TimeoutException:
238 raise RequestTimeoutError()
239 return self._parse_response(response)
242class AsyncClient(BaseClient):
243 """Asynchronous client for Notion's API."""
245 client: httpx.AsyncClient
247 def __init__(
248 self,
249 options: Optional[Union[Dict[str, Any], ClientOptions]] = None,
250 client: Optional[httpx.AsyncClient] = None,
251 **kwargs: Any,
252 ) -> None:
253 if client is None:
254 client = httpx.AsyncClient()
255 super().__init__(client, options, **kwargs)
257 async def __aenter__(self) -> "AsyncClient":
258 self.client = httpx.AsyncClient()
259 await self.client.__aenter__()
260 return self
262 async def __aexit__(
263 self,
264 exc_type: Type[BaseException],
265 exc_value: BaseException,
266 traceback: TracebackType,
267 ) -> None:
268 await self.client.__aexit__(exc_type, exc_value, traceback)
269 del self._clients[-1]
271 async def aclose(self) -> None:
272 """Close the connection pool of the current inner client."""
273 await self.client.aclose()
275 async def request(
276 self,
277 path: str,
278 method: str,
279 query: Optional[Dict[Any, Any]] = None,
280 body: Optional[Dict[Any, Any]] = None,
281 form_data: Optional[Dict[Any, Any]] = None,
282 auth: Optional[str] = None,
283 ) -> Any:
284 """Send an HTTP request asynchronously."""
285 request = self._build_request(method, path, query, body, form_data, auth)
286 try:
287 response = await self.client.send(request)
288 except httpx.TimeoutException:
289 raise RequestTimeoutError()
290 return self._parse_response(response)