Coverage for notion_client/client.py: 100%
138 statements
« prev ^ index » next coverage.py v7.11.3, created at 2025-11-14 11:21 +0000
« prev ^ index » next coverage.py v7.11.3, created at 2025-11-14 11:21 +0000
1"""Synchronous and asynchronous clients for Notion's API."""
3import base64
4import json
5import logging
6from abc import abstractmethod
7from dataclasses import dataclass
8from types import TracebackType
9from typing import Any, Dict, List, Optional, Type, Union
11import httpx
12from httpx import Request, Response
14from notion_client.api_endpoints import (
15 BlocksEndpoint,
16 CommentsEndpoint,
17 DatabasesEndpoint,
18 DataSourcesEndpoint,
19 PagesEndpoint,
20 SearchEndpoint,
21 UsersEndpoint,
22 FileUploadsEndpoint,
23 OAuthEndpoint,
24)
25from notion_client.errors import (
26 APIResponseError,
27 HTTPResponseError,
28 RequestTimeoutError,
29 is_api_error_code,
30)
31from notion_client.logging import make_console_logger
32from notion_client.typing import SyncAsync
35@dataclass
36class ClientOptions:
37 """Options to configure the client.
39 Attributes:
40 auth: Bearer token for authentication. If left undefined, the `auth` parameter
41 should be set on each request.
42 timeout_ms: Number of milliseconds to wait before emitting a
43 `RequestTimeoutError`.
44 base_url: The root URL for sending API requests. This can be changed to test
45 with a mock server.
46 log_level: Verbosity of logs the instance will produce. By default, logs are
47 written to `stdout`.
48 logger: A custom logger.
49 notion_version: Notion version to use.
50 """
52 auth: Optional[str] = None
53 timeout_ms: int = 60_000
54 base_url: str = "https://api.notion.com"
55 log_level: int = logging.WARNING
56 logger: Optional[logging.Logger] = None
57 notion_version: str = "2025-09-03"
60class BaseClient:
61 def __init__(
62 self,
63 client: Union[httpx.Client, httpx.AsyncClient],
64 options: Optional[Union[Dict[str, Any], ClientOptions]] = None,
65 **kwargs: Any,
66 ) -> None:
67 if options is None:
68 options = ClientOptions(**kwargs)
69 elif isinstance(options, dict):
70 options = ClientOptions(**options)
72 self.logger = options.logger or make_console_logger()
73 self.logger.setLevel(options.log_level)
74 self.options = options
76 self._clients: List[Union[httpx.Client, httpx.AsyncClient]] = []
77 self.client = client
79 self.blocks = BlocksEndpoint(self)
80 self.databases = DatabasesEndpoint(self)
81 self.data_sources = DataSourcesEndpoint(self)
82 self.users = UsersEndpoint(self)
83 self.pages = PagesEndpoint(self)
84 self.search = SearchEndpoint(self)
85 self.comments = CommentsEndpoint(self)
86 self.file_uploads = FileUploadsEndpoint(self)
87 self.oauth = OAuthEndpoint(self)
89 @property
90 def client(self) -> Union[httpx.Client, httpx.AsyncClient]:
91 return self._clients[-1]
93 @client.setter
94 def client(self, client: Union[httpx.Client, httpx.AsyncClient]) -> None:
95 client.base_url = httpx.URL(f"{self.options.base_url}/v1/")
96 client.timeout = httpx.Timeout(timeout=self.options.timeout_ms / 1_000)
97 client.headers = httpx.Headers(
98 {
99 "Notion-Version": self.options.notion_version,
100 "User-Agent": "ramnes/notion-sdk-py@2.7.0",
101 }
102 )
103 if self.options.auth:
104 client.headers["Authorization"] = f"Bearer {self.options.auth}"
105 self._clients.append(client)
107 def _build_request(
108 self,
109 method: str,
110 path: str,
111 query: Optional[Dict[Any, Any]] = None,
112 body: Optional[Dict[Any, Any]] = None,
113 form_data: Optional[Dict[Any, Any]] = None,
114 auth: Optional[Union[str, Dict[str, str]]] = None,
115 ) -> Request:
116 headers = httpx.Headers()
117 if auth:
118 if isinstance(auth, dict):
119 client_id = auth.get("client_id", "")
120 client_secret = auth.get("client_secret", "")
121 credentials = f"{client_id}:{client_secret}"
122 encoded_credentials = base64.b64encode(credentials.encode()).decode()
123 headers["Authorization"] = f"Basic {encoded_credentials}"
124 else:
125 headers["Authorization"] = f"Bearer {auth}"
126 self.logger.info(f"{method} {self.client.base_url}{path}")
127 self.logger.debug(f"=> {query} -- {body} -- {form_data}")
129 if not form_data:
130 return self.client.build_request(
131 method,
132 path,
133 params=query,
134 json=body,
135 headers=headers,
136 )
138 files: Dict[str, Any] = {}
139 data: Dict[str, Any] = {}
140 for key, value in form_data.items():
141 if isinstance(value, tuple) and len(value) >= 2:
142 files[key] = value
143 elif hasattr(value, "read"):
144 files[key] = value
145 elif isinstance(value, str):
146 data[key] = value
147 else:
148 data[key] = str(value)
150 return self.client.build_request(
151 method,
152 path,
153 params=query,
154 files=files,
155 data=data,
156 headers=headers,
157 )
159 def _parse_response(self, response: Response) -> Any:
160 try:
161 response.raise_for_status()
162 except httpx.HTTPStatusError as error:
163 try:
164 body = error.response.json()
165 code = body.get("code")
166 additional_data = body.get("additional_data")
167 request_id = body.get("request_id")
168 except json.JSONDecodeError:
169 code = None
170 additional_data = None
171 request_id = None
172 if code and is_api_error_code(code):
173 raise APIResponseError(
174 response,
175 body["message"],
176 code,
177 additional_data,
178 request_id,
179 )
180 raise HTTPResponseError(error.response)
182 body = response.json()
183 self.logger.debug(f"=> {body}")
185 return body
187 @abstractmethod
188 def request(
189 self,
190 path: str,
191 method: str,
192 query: Optional[Dict[Any, Any]] = None,
193 body: Optional[Dict[Any, Any]] = None,
194 form_data: Optional[Dict[Any, Any]] = None,
195 auth: Optional[Union[str, Dict[str, str]]] = None,
196 ) -> SyncAsync[Any]:
197 # noqa
198 pass
201class Client(BaseClient):
202 """Synchronous client for Notion's API."""
204 client: httpx.Client
206 def __init__(
207 self,
208 options: Optional[Union[Dict[Any, Any], ClientOptions]] = None,
209 client: Optional[httpx.Client] = None,
210 **kwargs: Any,
211 ) -> None:
212 if client is None:
213 client = httpx.Client()
214 super().__init__(client, options, **kwargs)
216 def __enter__(self) -> "Client":
217 self.client = httpx.Client()
218 self.client.__enter__()
219 return self
221 def __exit__(
222 self,
223 exc_type: Type[BaseException],
224 exc_value: BaseException,
225 traceback: TracebackType,
226 ) -> None:
227 self.client.__exit__(exc_type, exc_value, traceback)
228 del self._clients[-1]
230 def close(self) -> None:
231 """Close the connection pool of the current inner client."""
232 self.client.close()
234 def request(
235 self,
236 path: str,
237 method: str,
238 query: Optional[Dict[Any, Any]] = None,
239 body: Optional[Dict[Any, Any]] = None,
240 form_data: Optional[Dict[Any, Any]] = None,
241 auth: Optional[Union[str, Dict[str, str]]] = None,
242 ) -> Any:
243 """Send an HTTP request."""
244 request = self._build_request(method, path, query, body, form_data, auth)
245 try:
246 response = self.client.send(request)
247 except httpx.TimeoutException:
248 raise RequestTimeoutError()
249 return self._parse_response(response)
252class AsyncClient(BaseClient):
253 """Asynchronous client for Notion's API."""
255 client: httpx.AsyncClient
257 def __init__(
258 self,
259 options: Optional[Union[Dict[str, Any], ClientOptions]] = None,
260 client: Optional[httpx.AsyncClient] = None,
261 **kwargs: Any,
262 ) -> None:
263 if client is None:
264 client = httpx.AsyncClient()
265 super().__init__(client, options, **kwargs)
267 async def __aenter__(self) -> "AsyncClient":
268 self.client = httpx.AsyncClient()
269 await self.client.__aenter__()
270 return self
272 async def __aexit__(
273 self,
274 exc_type: Type[BaseException],
275 exc_value: BaseException,
276 traceback: TracebackType,
277 ) -> None:
278 await self.client.__aexit__(exc_type, exc_value, traceback)
279 del self._clients[-1]
281 async def aclose(self) -> None:
282 """Close the connection pool of the current inner client."""
283 await self.client.aclose()
285 async def request(
286 self,
287 path: str,
288 method: str,
289 query: Optional[Dict[Any, Any]] = None,
290 body: Optional[Dict[Any, Any]] = None,
291 form_data: Optional[Dict[Any, Any]] = None,
292 auth: Optional[Union[str, Dict[str, str]]] = None,
293 ) -> Any:
294 """Send an HTTP request asynchronously."""
295 request = self._build_request(method, path, query, body, form_data, auth)
296 try:
297 response = await self.client.send(request)
298 except httpx.TimeoutException:
299 raise RequestTimeoutError()
300 return self._parse_response(response)