Coverage for notion_client/client.py: 100%

114 statements  

« prev     ^ index     » next       coverage.py v7.6.0, created at 2024-07-13 22:22 +0000

1"""Synchronous and asynchronous clients for Notion's API.""" 

2import json 

3import logging 

4from abc import abstractclassmethod 

5from dataclasses import dataclass 

6from types import TracebackType 

7from typing import Any, Dict, List, Optional, Type, Union 

8 

9import httpx 

10from httpx import Request, Response 

11 

12from notion_client.api_endpoints import ( 

13 BlocksEndpoint, 

14 CommentsEndpoint, 

15 DatabasesEndpoint, 

16 PagesEndpoint, 

17 SearchEndpoint, 

18 UsersEndpoint, 

19) 

20from notion_client.errors import ( 

21 APIResponseError, 

22 HTTPResponseError, 

23 RequestTimeoutError, 

24 is_api_error_code, 

25) 

26from notion_client.logging import make_console_logger 

27from notion_client.typing import SyncAsync 

28 

29 

30@dataclass 

31class ClientOptions: 

32 """Options to configure the client. 

33 

34 Attributes: 

35 auth: Bearer token for authentication. If left undefined, the `auth` parameter 

36 should be set on each request. 

37 timeout_ms: Number of milliseconds to wait before emitting a 

38 `RequestTimeoutError`. 

39 base_url: The root URL for sending API requests. This can be changed to test with 

40 a mock server. 

41 log_level: Verbosity of logs the instance will produce. By default, logs are 

42 written to `stdout`. 

43 logger: A custom logger. 

44 notion_version: Notion version to use. 

45 """ 

46 

47 auth: Optional[str] = None 

48 timeout_ms: int = 60_000 

49 base_url: str = "https://api.notion.com" 

50 log_level: int = logging.WARNING 

51 logger: Optional[logging.Logger] = None 

52 notion_version: str = "2022-06-28" 

53 

54 

55class BaseClient: 

56 def __init__( 

57 self, 

58 client: Union[httpx.Client, httpx.AsyncClient], 

59 options: Optional[Union[Dict[str, Any], ClientOptions]] = None, 

60 **kwargs: Any, 

61 ) -> None: 

62 if options is None: 

63 options = ClientOptions(**kwargs) 

64 elif isinstance(options, dict): 

65 options = ClientOptions(**options) 

66 

67 self.logger = options.logger or make_console_logger() 

68 self.logger.setLevel(options.log_level) 

69 self.options = options 

70 

71 self._clients: List[Union[httpx.Client, httpx.AsyncClient]] = [] 

72 self.client = client 

73 

74 self.blocks = BlocksEndpoint(self) 

75 self.databases = DatabasesEndpoint(self) 

76 self.users = UsersEndpoint(self) 

77 self.pages = PagesEndpoint(self) 

78 self.search = SearchEndpoint(self) 

79 self.comments = CommentsEndpoint(self) 

80 

81 @property 

82 def client(self) -> Union[httpx.Client, httpx.AsyncClient]: 

83 return self._clients[-1] 

84 

85 @client.setter 

86 def client(self, client: Union[httpx.Client, httpx.AsyncClient]) -> None: 

87 client.base_url = httpx.URL(f"{self.options.base_url}/v1/") 

88 client.timeout = httpx.Timeout(timeout=self.options.timeout_ms / 1_000) 

89 client.headers = httpx.Headers( 

90 { 

91 "Notion-Version": self.options.notion_version, 

92 "User-Agent": "ramnes/notion-sdk-py@2.2.1", 

93 } 

94 ) 

95 if self.options.auth: 

96 client.headers["Authorization"] = f"Bearer {self.options.auth}" 

97 self._clients.append(client) 

98 

99 def _build_request( 

100 self, 

101 method: str, 

102 path: str, 

103 query: Optional[Dict[Any, Any]] = None, 

104 body: Optional[Dict[Any, Any]] = None, 

105 auth: Optional[str] = None, 

106 ) -> Request: 

107 headers = httpx.Headers() 

108 if auth: 

109 headers["Authorization"] = f"Bearer {auth}" 

110 self.logger.info(f"{method} {self.client.base_url}{path}") 

111 self.logger.debug(f"=> {query} -- {body}") 

112 return self.client.build_request( 

113 method, path, params=query, json=body, headers=headers 

114 ) 

115 

116 def _parse_response(self, response: Response) -> Any: 

117 try: 

118 response.raise_for_status() 

119 except httpx.HTTPStatusError as error: 

120 try: 

121 body = error.response.json() 

122 code = body.get("code") 

123 except json.JSONDecodeError: 

124 code = None 

125 if code and is_api_error_code(code): 

126 raise APIResponseError(response, body["message"], code) 

127 raise HTTPResponseError(error.response) 

128 

129 body = response.json() 

130 self.logger.debug(f"=> {body}") 

131 

132 return body 

133 

134 @abstractclassmethod 

135 def request( 

136 self, 

137 path: str, 

138 method: str, 

139 query: Optional[Dict[Any, Any]] = None, 

140 body: Optional[Dict[Any, Any]] = None, 

141 auth: Optional[str] = None, 

142 ) -> SyncAsync[Any]: 

143 # noqa 

144 pass 

145 

146 

147class Client(BaseClient): 

148 """Synchronous client for Notion's API.""" 

149 

150 client: httpx.Client 

151 

152 def __init__( 

153 self, 

154 options: Optional[Union[Dict[Any, Any], ClientOptions]] = None, 

155 client: Optional[httpx.Client] = None, 

156 **kwargs: Any, 

157 ) -> None: 

158 if client is None: 

159 client = httpx.Client() 

160 super().__init__(client, options, **kwargs) 

161 

162 def __enter__(self) -> "Client": 

163 self.client = httpx.Client() 

164 self.client.__enter__() 

165 return self 

166 

167 def __exit__( 

168 self, 

169 exc_type: Type[BaseException], 

170 exc_value: BaseException, 

171 traceback: TracebackType, 

172 ) -> None: 

173 self.client.__exit__(exc_type, exc_value, traceback) 

174 del self._clients[-1] 

175 

176 def close(self) -> None: 

177 """Close the connection pool of the current inner client.""" 

178 self.client.close() 

179 

180 def request( 

181 self, 

182 path: str, 

183 method: str, 

184 query: Optional[Dict[Any, Any]] = None, 

185 body: Optional[Dict[Any, Any]] = None, 

186 auth: Optional[str] = None, 

187 ) -> Any: 

188 """Send an HTTP request.""" 

189 request = self._build_request(method, path, query, body, auth) 

190 try: 

191 response = self.client.send(request) 

192 except httpx.TimeoutException: 

193 raise RequestTimeoutError() 

194 return self._parse_response(response) 

195 

196 

197class AsyncClient(BaseClient): 

198 """Asynchronous client for Notion's API.""" 

199 

200 client: httpx.AsyncClient 

201 

202 def __init__( 

203 self, 

204 options: Optional[Union[Dict[str, Any], ClientOptions]] = None, 

205 client: Optional[httpx.AsyncClient] = None, 

206 **kwargs: Any, 

207 ) -> None: 

208 if client is None: 

209 client = httpx.AsyncClient() 

210 super().__init__(client, options, **kwargs) 

211 

212 async def __aenter__(self) -> "AsyncClient": 

213 self.client = httpx.AsyncClient() 

214 await self.client.__aenter__() 

215 return self 

216 

217 async def __aexit__( 

218 self, 

219 exc_type: Type[BaseException], 

220 exc_value: BaseException, 

221 traceback: TracebackType, 

222 ) -> None: 

223 await self.client.__aexit__(exc_type, exc_value, traceback) 

224 del self._clients[-1] 

225 

226 async def aclose(self) -> None: 

227 """Close the connection pool of the current inner client.""" 

228 await self.client.aclose() 

229 

230 async def request( 

231 self, 

232 path: str, 

233 method: str, 

234 query: Optional[Dict[Any, Any]] = None, 

235 body: Optional[Dict[Any, Any]] = None, 

236 auth: Optional[str] = None, 

237 ) -> Any: 

238 """Send an HTTP request asynchronously.""" 

239 request = self._build_request(method, path, query, body, auth) 

240 try: 

241 response = await self.client.send(request) 

242 except httpx.TimeoutException: 

243 raise RequestTimeoutError() 

244 return self._parse_response(response)