Coverage for notion_client/client.py: 100%
127 statements
« prev ^ index » next coverage.py v7.10.5, created at 2025-08-26 10:36 +0000
« prev ^ index » next coverage.py v7.10.5, created at 2025-08-26 10:36 +0000
1"""Synchronous and asynchronous clients for Notion's API."""
3import json
4import logging
5from abc import abstractmethod
6from dataclasses import dataclass
7from types import TracebackType
8from typing import Any, Dict, List, Optional, Type, Union
10import httpx
11from httpx import Request, Response
13from notion_client.api_endpoints import (
14 BlocksEndpoint,
15 CommentsEndpoint,
16 DatabasesEndpoint,
17 PagesEndpoint,
18 SearchEndpoint,
19 UsersEndpoint,
20 FileUploadsEndpoint,
21)
22from notion_client.errors import (
23 APIResponseError,
24 HTTPResponseError,
25 RequestTimeoutError,
26 is_api_error_code,
27)
28from notion_client.logging import make_console_logger
29from notion_client.typing import SyncAsync
32@dataclass
33class ClientOptions:
34 """Options to configure the client.
36 Attributes:
37 auth: Bearer token for authentication. If left undefined, the `auth` parameter
38 should be set on each request.
39 timeout_ms: Number of milliseconds to wait before emitting a
40 `RequestTimeoutError`.
41 base_url: The root URL for sending API requests. This can be changed to test with
42 a mock server.
43 log_level: Verbosity of logs the instance will produce. By default, logs are
44 written to `stdout`.
45 logger: A custom logger.
46 notion_version: Notion version to use.
47 """
49 auth: Optional[str] = None
50 timeout_ms: int = 60_000
51 base_url: str = "https://api.notion.com"
52 log_level: int = logging.WARNING
53 logger: Optional[logging.Logger] = None
54 notion_version: str = "2022-06-28"
57class BaseClient:
58 def __init__(
59 self,
60 client: Union[httpx.Client, httpx.AsyncClient],
61 options: Optional[Union[Dict[str, Any], ClientOptions]] = None,
62 **kwargs: Any,
63 ) -> None:
64 if options is None:
65 options = ClientOptions(**kwargs)
66 elif isinstance(options, dict):
67 options = ClientOptions(**options)
69 self.logger = options.logger or make_console_logger()
70 self.logger.setLevel(options.log_level)
71 self.options = options
73 self._clients: List[Union[httpx.Client, httpx.AsyncClient]] = []
74 self.client = client
76 self.blocks = BlocksEndpoint(self)
77 self.databases = DatabasesEndpoint(self)
78 self.users = UsersEndpoint(self)
79 self.pages = PagesEndpoint(self)
80 self.search = SearchEndpoint(self)
81 self.comments = CommentsEndpoint(self)
82 self.file_uploads = FileUploadsEndpoint(self)
84 @property
85 def client(self) -> Union[httpx.Client, httpx.AsyncClient]:
86 return self._clients[-1]
88 @client.setter
89 def client(self, client: Union[httpx.Client, httpx.AsyncClient]) -> None:
90 client.base_url = httpx.URL(f"{self.options.base_url}/v1/")
91 client.timeout = httpx.Timeout(timeout=self.options.timeout_ms / 1_000)
92 client.headers = httpx.Headers(
93 {
94 "Notion-Version": self.options.notion_version,
95 "User-Agent": "ramnes/notion-sdk-py@2.5.0",
96 }
97 )
98 if self.options.auth:
99 client.headers["Authorization"] = f"Bearer {self.options.auth}"
100 self._clients.append(client)
102 def _build_request(
103 self,
104 method: str,
105 path: str,
106 query: Optional[Dict[Any, Any]] = None,
107 body: Optional[Dict[Any, Any]] = None,
108 form_data: Optional[Dict[Any, Any]] = None,
109 auth: Optional[str] = None,
110 ) -> Request:
111 headers = httpx.Headers()
112 if auth:
113 headers["Authorization"] = f"Bearer {auth}"
114 self.logger.info(f"{method} {self.client.base_url}{path}")
115 self.logger.debug(f"=> {query} -- {body} -- {form_data}")
117 if not form_data:
118 return self.client.build_request(
119 method,
120 path,
121 params=query,
122 json=body,
123 headers=headers,
124 )
126 files: Dict[str, Any] = {}
127 data: Dict[str, Any] = {}
128 for key, value in form_data.items():
129 if isinstance(value, tuple) and len(value) >= 2:
130 files[key] = value
131 elif hasattr(value, "read"):
132 files[key] = value
133 elif isinstance(value, str):
134 data[key] = value
135 else:
136 data[key] = str(value)
138 return self.client.build_request(
139 method,
140 path,
141 params=query,
142 files=files,
143 data=data,
144 headers=headers,
145 )
147 def _parse_response(self, response: Response) -> Any:
148 try:
149 response.raise_for_status()
150 except httpx.HTTPStatusError as error:
151 try:
152 body = error.response.json()
153 code = body.get("code")
154 except json.JSONDecodeError:
155 code = None
156 if code and is_api_error_code(code):
157 raise APIResponseError(response, body["message"], code)
158 raise HTTPResponseError(error.response)
160 body = response.json()
161 self.logger.debug(f"=> {body}")
163 return body
165 @abstractmethod
166 def request(
167 self,
168 path: str,
169 method: str,
170 query: Optional[Dict[Any, Any]] = None,
171 body: Optional[Dict[Any, Any]] = None,
172 form_data: Optional[Dict[Any, Any]] = None,
173 auth: Optional[str] = None,
174 ) -> SyncAsync[Any]:
175 # noqa
176 pass
179class Client(BaseClient):
180 """Synchronous client for Notion's API."""
182 client: httpx.Client
184 def __init__(
185 self,
186 options: Optional[Union[Dict[Any, Any], ClientOptions]] = None,
187 client: Optional[httpx.Client] = None,
188 **kwargs: Any,
189 ) -> None:
190 if client is None:
191 client = httpx.Client()
192 super().__init__(client, options, **kwargs)
194 def __enter__(self) -> "Client":
195 self.client = httpx.Client()
196 self.client.__enter__()
197 return self
199 def __exit__(
200 self,
201 exc_type: Type[BaseException],
202 exc_value: BaseException,
203 traceback: TracebackType,
204 ) -> None:
205 self.client.__exit__(exc_type, exc_value, traceback)
206 del self._clients[-1]
208 def close(self) -> None:
209 """Close the connection pool of the current inner client."""
210 self.client.close()
212 def request(
213 self,
214 path: str,
215 method: str,
216 query: Optional[Dict[Any, Any]] = None,
217 body: Optional[Dict[Any, Any]] = None,
218 form_data: Optional[Dict[Any, Any]] = None,
219 auth: Optional[str] = None,
220 ) -> Any:
221 """Send an HTTP request."""
222 request = self._build_request(method, path, query, body, form_data, auth)
223 try:
224 response = self.client.send(request)
225 except httpx.TimeoutException:
226 raise RequestTimeoutError()
227 return self._parse_response(response)
230class AsyncClient(BaseClient):
231 """Asynchronous client for Notion's API."""
233 client: httpx.AsyncClient
235 def __init__(
236 self,
237 options: Optional[Union[Dict[str, Any], ClientOptions]] = None,
238 client: Optional[httpx.AsyncClient] = None,
239 **kwargs: Any,
240 ) -> None:
241 if client is None:
242 client = httpx.AsyncClient()
243 super().__init__(client, options, **kwargs)
245 async def __aenter__(self) -> "AsyncClient":
246 self.client = httpx.AsyncClient()
247 await self.client.__aenter__()
248 return self
250 async def __aexit__(
251 self,
252 exc_type: Type[BaseException],
253 exc_value: BaseException,
254 traceback: TracebackType,
255 ) -> None:
256 await self.client.__aexit__(exc_type, exc_value, traceback)
257 del self._clients[-1]
259 async def aclose(self) -> None:
260 """Close the connection pool of the current inner client."""
261 await self.client.aclose()
263 async def request(
264 self,
265 path: str,
266 method: str,
267 query: Optional[Dict[Any, Any]] = None,
268 body: Optional[Dict[Any, Any]] = None,
269 form_data: Optional[Dict[Any, Any]] = None,
270 auth: Optional[str] = None,
271 ) -> Any:
272 """Send an HTTP request asynchronously."""
273 request = self._build_request(method, path, query, body, form_data, auth)
274 try:
275 response = await self.client.send(request)
276 except httpx.TimeoutException:
277 raise RequestTimeoutError()
278 return self._parse_response(response)