Coverage for notion_client/client.py: 100%

130 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-11-04 09:02 +0000

1"""Synchronous and asynchronous clients for Notion's API.""" 

2 

3import json 

4import logging 

5from abc import abstractmethod 

6from dataclasses import dataclass 

7from types import TracebackType 

8from typing import Any, Dict, List, Optional, Type, Union 

9 

10import httpx 

11from httpx import Request, Response 

12 

13from notion_client.api_endpoints import ( 

14 BlocksEndpoint, 

15 CommentsEndpoint, 

16 DatabasesEndpoint, 

17 DataSourcesEndpoint, 

18 PagesEndpoint, 

19 SearchEndpoint, 

20 UsersEndpoint, 

21 FileUploadsEndpoint, 

22) 

23from notion_client.errors import ( 

24 APIResponseError, 

25 HTTPResponseError, 

26 RequestTimeoutError, 

27 is_api_error_code, 

28) 

29from notion_client.logging import make_console_logger 

30from notion_client.typing import SyncAsync 

31 

32 

33@dataclass 

34class ClientOptions: 

35 """Options to configure the client. 

36 

37 Attributes: 

38 auth: Bearer token for authentication. If left undefined, the `auth` parameter 

39 should be set on each request. 

40 timeout_ms: Number of milliseconds to wait before emitting a 

41 `RequestTimeoutError`. 

42 base_url: The root URL for sending API requests. This can be changed to test 

43 with a mock server. 

44 log_level: Verbosity of logs the instance will produce. By default, logs are 

45 written to `stdout`. 

46 logger: A custom logger. 

47 notion_version: Notion version to use. 

48 """ 

49 

50 auth: Optional[str] = None 

51 timeout_ms: int = 60_000 

52 base_url: str = "https://api.notion.com" 

53 log_level: int = logging.WARNING 

54 logger: Optional[logging.Logger] = None 

55 notion_version: str = "2025-09-03" 

56 

57 

58class BaseClient: 

59 def __init__( 

60 self, 

61 client: Union[httpx.Client, httpx.AsyncClient], 

62 options: Optional[Union[Dict[str, Any], ClientOptions]] = None, 

63 **kwargs: Any, 

64 ) -> None: 

65 if options is None: 

66 options = ClientOptions(**kwargs) 

67 elif isinstance(options, dict): 

68 options = ClientOptions(**options) 

69 

70 self.logger = options.logger or make_console_logger() 

71 self.logger.setLevel(options.log_level) 

72 self.options = options 

73 

74 self._clients: List[Union[httpx.Client, httpx.AsyncClient]] = [] 

75 self.client = client 

76 

77 self.blocks = BlocksEndpoint(self) 

78 self.databases = DatabasesEndpoint(self) 

79 self.data_sources = DataSourcesEndpoint(self) 

80 self.users = UsersEndpoint(self) 

81 self.pages = PagesEndpoint(self) 

82 self.search = SearchEndpoint(self) 

83 self.comments = CommentsEndpoint(self) 

84 self.file_uploads = FileUploadsEndpoint(self) 

85 

86 @property 

87 def client(self) -> Union[httpx.Client, httpx.AsyncClient]: 

88 return self._clients[-1] 

89 

90 @client.setter 

91 def client(self, client: Union[httpx.Client, httpx.AsyncClient]) -> None: 

92 client.base_url = httpx.URL(f"{self.options.base_url}/v1/") 

93 client.timeout = httpx.Timeout(timeout=self.options.timeout_ms / 1_000) 

94 client.headers = httpx.Headers( 

95 { 

96 "Notion-Version": self.options.notion_version, 

97 "User-Agent": "ramnes/notion-sdk-py@2.7.0", 

98 } 

99 ) 

100 if self.options.auth: 

101 client.headers["Authorization"] = f"Bearer {self.options.auth}" 

102 self._clients.append(client) 

103 

104 def _build_request( 

105 self, 

106 method: str, 

107 path: str, 

108 query: Optional[Dict[Any, Any]] = None, 

109 body: Optional[Dict[Any, Any]] = None, 

110 form_data: Optional[Dict[Any, Any]] = None, 

111 auth: Optional[str] = None, 

112 ) -> Request: 

113 headers = httpx.Headers() 

114 if auth: 

115 headers["Authorization"] = f"Bearer {auth}" 

116 self.logger.info(f"{method} {self.client.base_url}{path}") 

117 self.logger.debug(f"=> {query} -- {body} -- {form_data}") 

118 

119 if not form_data: 

120 return self.client.build_request( 

121 method, 

122 path, 

123 params=query, 

124 json=body, 

125 headers=headers, 

126 ) 

127 

128 files: Dict[str, Any] = {} 

129 data: Dict[str, Any] = {} 

130 for key, value in form_data.items(): 

131 if isinstance(value, tuple) and len(value) >= 2: 

132 files[key] = value 

133 elif hasattr(value, "read"): 

134 files[key] = value 

135 elif isinstance(value, str): 

136 data[key] = value 

137 else: 

138 data[key] = str(value) 

139 

140 return self.client.build_request( 

141 method, 

142 path, 

143 params=query, 

144 files=files, 

145 data=data, 

146 headers=headers, 

147 ) 

148 

149 def _parse_response(self, response: Response) -> Any: 

150 try: 

151 response.raise_for_status() 

152 except httpx.HTTPStatusError as error: 

153 try: 

154 body = error.response.json() 

155 code = body.get("code") 

156 additional_data = body.get("additional_data") 

157 request_id = body.get("request_id") 

158 except json.JSONDecodeError: 

159 code = None 

160 additional_data = None 

161 request_id = None 

162 if code and is_api_error_code(code): 

163 raise APIResponseError( 

164 response, 

165 body["message"], 

166 code, 

167 additional_data, 

168 request_id, 

169 ) 

170 raise HTTPResponseError(error.response) 

171 

172 body = response.json() 

173 self.logger.debug(f"=> {body}") 

174 

175 return body 

176 

177 @abstractmethod 

178 def request( 

179 self, 

180 path: str, 

181 method: str, 

182 query: Optional[Dict[Any, Any]] = None, 

183 body: Optional[Dict[Any, Any]] = None, 

184 form_data: Optional[Dict[Any, Any]] = None, 

185 auth: Optional[str] = None, 

186 ) -> SyncAsync[Any]: 

187 # noqa 

188 pass 

189 

190 

191class Client(BaseClient): 

192 """Synchronous client for Notion's API.""" 

193 

194 client: httpx.Client 

195 

196 def __init__( 

197 self, 

198 options: Optional[Union[Dict[Any, Any], ClientOptions]] = None, 

199 client: Optional[httpx.Client] = None, 

200 **kwargs: Any, 

201 ) -> None: 

202 if client is None: 

203 client = httpx.Client() 

204 super().__init__(client, options, **kwargs) 

205 

206 def __enter__(self) -> "Client": 

207 self.client = httpx.Client() 

208 self.client.__enter__() 

209 return self 

210 

211 def __exit__( 

212 self, 

213 exc_type: Type[BaseException], 

214 exc_value: BaseException, 

215 traceback: TracebackType, 

216 ) -> None: 

217 self.client.__exit__(exc_type, exc_value, traceback) 

218 del self._clients[-1] 

219 

220 def close(self) -> None: 

221 """Close the connection pool of the current inner client.""" 

222 self.client.close() 

223 

224 def request( 

225 self, 

226 path: str, 

227 method: str, 

228 query: Optional[Dict[Any, Any]] = None, 

229 body: Optional[Dict[Any, Any]] = None, 

230 form_data: Optional[Dict[Any, Any]] = None, 

231 auth: Optional[str] = None, 

232 ) -> Any: 

233 """Send an HTTP request.""" 

234 request = self._build_request(method, path, query, body, form_data, auth) 

235 try: 

236 response = self.client.send(request) 

237 except httpx.TimeoutException: 

238 raise RequestTimeoutError() 

239 return self._parse_response(response) 

240 

241 

242class AsyncClient(BaseClient): 

243 """Asynchronous client for Notion's API.""" 

244 

245 client: httpx.AsyncClient 

246 

247 def __init__( 

248 self, 

249 options: Optional[Union[Dict[str, Any], ClientOptions]] = None, 

250 client: Optional[httpx.AsyncClient] = None, 

251 **kwargs: Any, 

252 ) -> None: 

253 if client is None: 

254 client = httpx.AsyncClient() 

255 super().__init__(client, options, **kwargs) 

256 

257 async def __aenter__(self) -> "AsyncClient": 

258 self.client = httpx.AsyncClient() 

259 await self.client.__aenter__() 

260 return self 

261 

262 async def __aexit__( 

263 self, 

264 exc_type: Type[BaseException], 

265 exc_value: BaseException, 

266 traceback: TracebackType, 

267 ) -> None: 

268 await self.client.__aexit__(exc_type, exc_value, traceback) 

269 del self._clients[-1] 

270 

271 async def aclose(self) -> None: 

272 """Close the connection pool of the current inner client.""" 

273 await self.client.aclose() 

274 

275 async def request( 

276 self, 

277 path: str, 

278 method: str, 

279 query: Optional[Dict[Any, Any]] = None, 

280 body: Optional[Dict[Any, Any]] = None, 

281 form_data: Optional[Dict[Any, Any]] = None, 

282 auth: Optional[str] = None, 

283 ) -> Any: 

284 """Send an HTTP request asynchronously.""" 

285 request = self._build_request(method, path, query, body, form_data, auth) 

286 try: 

287 response = await self.client.send(request) 

288 except httpx.TimeoutException: 

289 raise RequestTimeoutError() 

290 return self._parse_response(response)