为避免因某些网络或其他不可控因素导致的功能问题。比如发送请求的时候,因为网络不稳定,经常会出现请求超时的问题。这种情况下,我们通常会在代码中加入重试代码。重试代码本身并不难实现,但是如何写得优雅好用才是我们需要考虑的。这里要介绍的是一个第三方库——Tenacity(题目中的重试机制并不准确,它不是Python的内置模块,所以不能称之为机制),它实现了我们几乎所有的东西可以使用所有的重试场景,比如:什么情况下应该进行重试?重试多少次?结束重试需要多长时间?每次重试间隔多长时间?重试失败后回调?在使用它之前,先安装它$pipinstalltenacity1。最基本的重试无条件重试,无条件重试fromtenacityimportretry@retrydeftest_retry():print("waitingforretry,nointervalbetweenretriesExecute...")raiseExceptiontest_retry()无条件重试,但等待2秒在从韧性导入重试重试之前,wait_fixed@retry(wait=wait_fixed(2))deftest_retry():print("waitforretry...")raiseExceptiontest_retry()2。设置停止基本条件只重试7次fromtenacityimportretry,stop_after_attempt@retry(stop=stop_after_attempt(7))deftest_retry():print("Waitingforretry...")raiseExceptiontest_retry()10秒后重试fromtenacityimportretry,stop_after_delay@retry(stop=stop_after_delay(10))deftest_retry():print("Waitingforretry...")raiseExceptiontest_retry()或者满足以上两个条件之一,重试将从韧性导入重试结束,stop_after_delay,stop_after_attempt@retry(stop=(stop_after_delay(10)|stop_after_attempt(7)))deftest_retry():print("Waitingforretry...")raiseExceptiontest_retry()3.设置特定错误/异常(如请求超时)时重试,retryfromrequestsimportexceptionsfromtenacityimportretry,retry_if_exception_type@retry(retry=retry_if_exception_type(exceptions.Timeout))deftest_retry():print("Waitingforretry...")raiseexceptions.Timeouttest_retry()当满足自定义条件时,则继续重试下面的例子,当test_retry函数的返回值为False时,重试fromtenacityimportretry,stop_after_attempt,retry_if_resultdefis_false(value):返回值为False@retry(stop=stop_after_attempt(3),retry=retry_if_result(is_f))deftest_retry():returnFalsetest_retry()4.重试后,错误被重新抛出。当出现异常时,tenacity会重试。如果重试后仍然失败,默认情况下,上面抛出的异常会变成RetryError,而不是rootcause。所以可以加一个参数(reraise=True),这样当重试失败时,抛出的异常还是原来的异常。从韧性导入重试,stop_after_attempt@retry(stop=stop_after_attempt(7),reraise=True)deftest_retry():print("waitingforretry...")raiseExceptiontest_retry()5。设置上次重试时的回调函数失败后可以执行回调函数fromtenacityimport*defreturn_last_value(retry_state):print("executecallbackfunction")returnretry_state.outcome.result()#表示返回的返回值原函数defis_false(value):返回值为False@retry(stop=stop_after_attempt(3),retry_error_callback=return_last_value,retry=retry_if_result(is_false))deftest_retry():print("Waitingforretrying...")returnFalseprint(test_retry())输出等待如下Retrying...Waitingforretrying...Waitingforretrying...执行回调函数False以上就是本次分享的全部内容。觉得文章还不错的话,请关注公众号:Python编程学习圈,每日干货分享,发送“J”还能领取大量学习资料。或者去编程学习网了解更多编程技术知识。
