Coverage for notion_client / client.py: 100%
242 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-17 21:35 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-17 21:35 +0000
1"""Synchronous and asynchronous clients for Notion's API."""
3import asyncio
4import base64
5import logging
6import math
7import random
8import time
9from abc import abstractmethod
10from dataclasses import dataclass, field
11from email.utils import parsedate_to_datetime
12from types import TracebackType
13from typing import Any, Dict, List, Optional, Type, Union
15import httpx
16from httpx import Request, Response
18from notion_client.api_endpoints import (
19 BlocksEndpoint,
20 CommentsEndpoint,
21 DatabasesEndpoint,
22 DataSourcesEndpoint,
23 PagesEndpoint,
24 SearchEndpoint,
25 UsersEndpoint,
26 FileUploadsEndpoint,
27 OAuthEndpoint,
28)
29from notion_client.errors import (
30 APIErrorCode,
31 APIResponseError,
32 build_request_error,
33 is_http_response_error,
34 is_notion_client_error,
35 NotionClientError,
36 RequestTimeoutError,
37 validate_request_path,
38)
39from notion_client.logging import make_console_logger
40from notion_client.typing import SyncAsync
43@dataclass
44class RetryOptions:
45 """Configuration for automatic retries on rate limit (429) and server errors.
47 Attributes:
48 max_retries: Maximum number of retry attempts. Set to 0 to disable retries.
49 initial_retry_delay_ms: Initial delay between retries in milliseconds.
50 Used as base for exponential back-off when retry-after header is absent.
51 max_retry_delay_ms: Maximum delay between retries in milliseconds.
52 """
54 max_retries: int = 2
55 initial_retry_delay_ms: int = 1000
56 max_retry_delay_ms: int = 60_000
59@dataclass
60class ClientOptions:
61 """Options to configure the client.
63 Attributes:
64 auth: Bearer token for authentication. If left undefined, the `auth` parameter
65 should be set on each request.
66 timeout_ms: Number of milliseconds to wait before emitting a
67 `RequestTimeoutError`.
68 base_url: The root URL for sending API requests. This can be changed to test
69 with a mock server.
70 log_level: Verbosity of logs the instance will produce. By default, logs are
71 written to `stdout`.
72 logger: A custom logger.
73 notion_version: Notion version to use.
74 retry: Configuration for automatic retries on rate limit (429) and server errors.
75 Set to False to disable retries entirely.
76 """
78 auth: Optional[str] = None
79 timeout_ms: int = 60_000
80 base_url: str = "https://api.notion.com"
81 log_level: int = logging.WARNING
82 logger: Optional[logging.Logger] = None
83 notion_version: str = "2025-09-03"
84 retry: Union[RetryOptions, bool] = field(default_factory=RetryOptions)
87class BaseClient:
88 def __init__(
89 self,
90 client: Union[httpx.Client, httpx.AsyncClient],
91 options: Optional[Union[Dict[str, Any], ClientOptions]] = None,
92 **kwargs: Any,
93 ) -> None:
94 if options is None:
95 options = ClientOptions(**kwargs)
96 elif isinstance(options, dict):
97 options = ClientOptions(**options)
99 self.logger = options.logger or make_console_logger()
100 self.logger.setLevel(options.log_level)
101 self.options = options
103 if options.retry is False:
104 self._max_retries = 0
105 self._initial_retry_delay_ms = 0
106 self._max_retry_delay_ms = 0
107 elif isinstance(options.retry, RetryOptions):
108 self._max_retries = options.retry.max_retries
109 self._initial_retry_delay_ms = options.retry.initial_retry_delay_ms
110 self._max_retry_delay_ms = options.retry.max_retry_delay_ms
111 else:
112 retry_opts = RetryOptions()
113 self._max_retries = retry_opts.max_retries
114 self._initial_retry_delay_ms = retry_opts.initial_retry_delay_ms
115 self._max_retry_delay_ms = retry_opts.max_retry_delay_ms
117 self._clients: List[Union[httpx.Client, httpx.AsyncClient]] = []
118 self.client = client
120 self.blocks = BlocksEndpoint(self)
121 self.databases = DatabasesEndpoint(self)
122 self.data_sources = DataSourcesEndpoint(self)
123 self.users = UsersEndpoint(self)
124 self.pages = PagesEndpoint(self)
125 self.search = SearchEndpoint(self)
126 self.comments = CommentsEndpoint(self)
127 self.file_uploads = FileUploadsEndpoint(self)
128 self.oauth = OAuthEndpoint(self)
130 @property
131 def client(self) -> Union[httpx.Client, httpx.AsyncClient]:
132 return self._clients[-1]
134 @client.setter
135 def client(self, client: Union[httpx.Client, httpx.AsyncClient]) -> None:
136 client.base_url = httpx.URL(f"{self.options.base_url}/v1/")
137 client.timeout = httpx.Timeout(timeout=self.options.timeout_ms / 1_000)
138 client.headers = httpx.Headers(
139 {
140 "Notion-Version": self.options.notion_version,
141 "User-Agent": "ramnes/notion-sdk-py@3.0.0",
142 }
143 )
144 if self.options.auth:
145 client.headers["Authorization"] = f"Bearer {self.options.auth}"
146 self._clients.append(client)
148 def _build_request(
149 self,
150 method: str,
151 path: str,
152 query: Optional[Dict[Any, Any]] = None,
153 body: Optional[Dict[Any, Any]] = None,
154 form_data: Optional[Dict[Any, Any]] = None,
155 auth: Optional[Union[str, Dict[str, str]]] = None,
156 ) -> Request:
157 headers = httpx.Headers()
158 if auth:
159 if isinstance(auth, dict):
160 client_id = auth.get("client_id", "")
161 client_secret = auth.get("client_secret", "")
162 credentials = f"{client_id}:{client_secret}"
163 encoded_credentials = base64.b64encode(credentials.encode()).decode()
164 headers["Authorization"] = f"Basic {encoded_credentials}"
165 else:
166 headers["Authorization"] = f"Bearer {auth}"
168 if not form_data:
169 return self.client.build_request(
170 method,
171 path,
172 params=query,
173 json=body,
174 headers=headers,
175 )
177 files: Dict[str, Any] = {}
178 data: Dict[str, Any] = {}
179 for key, value in form_data.items():
180 if isinstance(value, tuple) and len(value) >= 2:
181 files[key] = value
182 elif hasattr(value, "read"):
183 files[key] = value
184 elif isinstance(value, str):
185 data[key] = value
186 else:
187 data[key] = str(value)
189 return self.client.build_request(
190 method,
191 path,
192 params=query,
193 files=files,
194 data=data,
195 headers=headers,
196 )
198 def _parse_response(self, response: Response) -> Any:
199 try:
200 response.raise_for_status()
201 except httpx.HTTPStatusError as error:
202 body_text = error.response.text
203 raise build_request_error(error.response, body_text)
205 return response.json()
207 def _extract_request_id(self, obj: Any) -> Optional[str]:
208 """Extracts request_id from an object if present."""
209 if isinstance(obj, dict):
210 return obj.get("request_id")
211 else:
212 request_id = getattr(obj, "request_id", None)
213 return request_id if isinstance(request_id, str) else None
215 def _log_request_success(self, method: str, path: str, response_body: Any) -> None:
216 """Logs a successful request."""
217 request_id = self._extract_request_id(response_body)
218 msg = f"request success: method={method}, path={path}"
219 if request_id:
220 msg += f", request_id={request_id}"
221 self.logger.info(msg)
223 def _log_request_error(self, error: NotionClientError, attempt: int = 0) -> None:
224 """Logs a request error with appropriate detail level."""
225 request_id = self._extract_request_id(error)
226 msg = f"request fail: code={error.code}, message={error}, attempt={attempt}"
227 if request_id:
228 msg += f", request_id={request_id}"
229 self.logger.warning(msg)
230 if is_http_response_error(error):
231 self.logger.debug(f"failed response body: {error.body}")
233 def _can_retry(self, error: Exception, method: str) -> bool:
234 """Determines if an error can be retried based on its error code and method.
236 Rate limits (429) are always retryable since the server explicitly asks us to retry.
237 Server errors (500, 503) are only retried for idempotent methods
238 (GET, DELETE) to avoid duplicate side effects.
239 """
240 if not APIResponseError.is_api_response_error(error):
241 return False
243 # Rate limits are always retryable - server says "try again later"
244 if error.code == APIErrorCode.RateLimited:
245 return True
247 # Server errors only retry for idempotent methods
248 is_idempotent = method.upper() in ("GET", "DELETE")
249 if is_idempotent:
250 return error.code in (
251 APIErrorCode.InternalServerError,
252 APIErrorCode.ServiceUnavailable,
253 )
255 return False
257 def _calculate_retry_delay(self, error: Exception, attempt: int) -> float:
258 """Calculates the delay before the next retry attempt.
260 Uses retry-after header if present, otherwise exponential back-off with jitter.
261 Returns delay in seconds.
262 """
263 if APIResponseError.is_api_response_error(error):
264 retry_after_ms = self._parse_retry_after_header(error.headers)
265 if retry_after_ms is not None:
266 return min(retry_after_ms, self._max_retry_delay_ms) / 1000.0
268 # Exponential back-off with full jitter
269 base_delay = self._initial_retry_delay_ms * math.pow(2, attempt)
270 jitter = random.random()
271 delay = (
272 min(base_delay * jitter + base_delay / 2, self._max_retry_delay_ms) / 1000.0
273 )
274 return delay
276 def _parse_retry_after_header(self, headers: httpx.Headers) -> Optional[float]:
277 """Parses the retry-after header value.
279 Supports both delta-seconds (e.g., "120") and HTTP-date formats.
280 Returns the delay in milliseconds, or None if not present or invalid.
281 """
282 retry_after_value = headers.get("retry-after")
283 if not retry_after_value:
284 return None
286 # Try parsing as delta-seconds (integer)
287 try:
288 seconds = int(retry_after_value)
289 if seconds >= 0:
290 return seconds * 1000.0
291 except ValueError:
292 pass
294 # Try parsing as HTTP-date
295 try:
296 retry_date = parsedate_to_datetime(retry_after_value)
297 delay_ms = (retry_date.timestamp() - time.time()) * 1000.0
298 return delay_ms if delay_ms > 0 else 0.0
299 except (ValueError, TypeError):
300 pass
302 return None
304 @abstractmethod
305 def request(
306 self,
307 path: str,
308 method: str,
309 query: Optional[Dict[Any, Any]] = None,
310 body: Optional[Dict[Any, Any]] = None,
311 form_data: Optional[Dict[Any, Any]] = None,
312 auth: Optional[Union[str, Dict[str, str]]] = None,
313 ) -> SyncAsync[Any]:
314 # noqa
315 pass
318class Client(BaseClient):
319 """Synchronous client for Notion's API."""
321 client: httpx.Client
323 def __init__(
324 self,
325 options: Optional[Union[Dict[Any, Any], ClientOptions]] = None,
326 client: Optional[httpx.Client] = None,
327 **kwargs: Any,
328 ) -> None:
329 if client is None:
330 client = httpx.Client()
331 super().__init__(client, options, **kwargs)
333 def __enter__(self) -> "Client":
334 self.client = httpx.Client()
335 self.client.__enter__()
336 return self
338 def __exit__(
339 self,
340 exc_type: Type[BaseException],
341 exc_value: BaseException,
342 traceback: TracebackType,
343 ) -> None:
344 self.client.__exit__(exc_type, exc_value, traceback)
345 del self._clients[-1]
347 def close(self) -> None:
348 """Close the connection pool of the current inner client."""
349 self.client.close()
351 def request(
352 self,
353 path: str,
354 method: str,
355 query: Optional[Dict[Any, Any]] = None,
356 body: Optional[Dict[Any, Any]] = None,
357 form_data: Optional[Dict[Any, Any]] = None,
358 auth: Optional[Union[str, Dict[str, str]]] = None,
359 ) -> Any:
360 """Send an HTTP request."""
361 validate_request_path(path)
362 self.logger.info(f"{method} {self.client.base_url}{path}")
363 return self._execute_with_retry(method, path, query, body, form_data, auth)
365 def _execute_with_retry(
366 self,
367 method: str,
368 path: str,
369 query: Optional[Dict[Any, Any]],
370 body: Optional[Dict[Any, Any]],
371 form_data: Optional[Dict[Any, Any]],
372 auth: Optional[Union[str, Dict[str, str]]],
373 ) -> Any:
374 """Executes the request with retry logic."""
375 attempt = 0
376 while True:
377 request = self._build_request(method, path, query, body, form_data, auth)
378 try:
379 return self._execute_single_request(request, method, path)
380 except Exception as error:
381 if not is_notion_client_error(error):
382 raise error
384 self._log_request_error(error, attempt)
386 if attempt >= self._max_retries or not self._can_retry(error, method):
387 raise error
389 delay = self._calculate_retry_delay(error, attempt)
390 self.logger.info(
391 f"retrying request: method={method}, path={path}, attempt={attempt + 1}, delay_ms={delay * 1000:.0f}"
392 )
393 time.sleep(delay)
394 attempt += 1
396 def _execute_single_request(self, request: Request, method: str, path: str) -> Any:
397 """Executes a single HTTP request (no retry)."""
398 try:
399 response = self.client.send(request)
400 except httpx.TimeoutException:
401 raise RequestTimeoutError()
402 response_body = self._parse_response(response)
403 self._log_request_success(method, path, response_body)
404 return response_body
407class AsyncClient(BaseClient):
408 """Asynchronous client for Notion's API."""
410 client: httpx.AsyncClient
412 def __init__(
413 self,
414 options: Optional[Union[Dict[str, Any], ClientOptions]] = None,
415 client: Optional[httpx.AsyncClient] = None,
416 **kwargs: Any,
417 ) -> None:
418 if client is None:
419 client = httpx.AsyncClient()
420 super().__init__(client, options, **kwargs)
422 async def __aenter__(self) -> "AsyncClient":
423 self.client = httpx.AsyncClient()
424 await self.client.__aenter__()
425 return self
427 async def __aexit__(
428 self,
429 exc_type: Type[BaseException],
430 exc_value: BaseException,
431 traceback: TracebackType,
432 ) -> None:
433 await self.client.__aexit__(exc_type, exc_value, traceback)
434 del self._clients[-1]
436 async def aclose(self) -> None:
437 """Close the connection pool of the current inner client."""
438 await self.client.aclose()
440 async def request(
441 self,
442 path: str,
443 method: str,
444 query: Optional[Dict[Any, Any]] = None,
445 body: Optional[Dict[Any, Any]] = None,
446 form_data: Optional[Dict[Any, Any]] = None,
447 auth: Optional[Union[str, Dict[str, str]]] = None,
448 ) -> Any:
449 """Send an HTTP request asynchronously."""
450 validate_request_path(path)
451 self.logger.info(f"{method} {self.client.base_url}{path}")
452 return await self._execute_with_retry(
453 method, path, query, body, form_data, auth
454 )
456 async def _execute_with_retry(
457 self,
458 method: str,
459 path: str,
460 query: Optional[Dict[Any, Any]],
461 body: Optional[Dict[Any, Any]],
462 form_data: Optional[Dict[Any, Any]],
463 auth: Optional[Union[str, Dict[str, str]]],
464 ) -> Any:
465 """Executes the request with retry logic."""
466 attempt = 0
467 while True:
468 request = self._build_request(method, path, query, body, form_data, auth)
469 try:
470 return await self._execute_single_request(request, method, path)
471 except Exception as error:
472 if not is_notion_client_error(error):
473 raise error
475 self._log_request_error(error, attempt)
477 if attempt >= self._max_retries or not self._can_retry(error, method):
478 raise error
480 delay = self._calculate_retry_delay(error, attempt)
481 self.logger.info(
482 f"retrying request: method={method}, path={path}, attempt={attempt + 1}, delay_ms={delay * 1000:.0f}"
483 )
484 await asyncio.sleep(delay)
485 attempt += 1
487 async def _execute_single_request(
488 self, request: Request, method: str, path: str
489 ) -> Any:
490 """Executes a single HTTP request (no retry)."""
491 try:
492 response = await self.client.send(request)
493 except httpx.TimeoutException:
494 raise RequestTimeoutError()
495 response_body = self._parse_response(response)
496 self._log_request_success(method, path, response_body)
497 return response_body