微速訊:冪等 —— 讓函數再次純潔
純潔地做人,是一種美德。寫純函數,是程序員的美德。
純函數純函數的美德就是,對于同樣的輸入,總是給予相同的輸出,不改變系統的狀態,不產生非預期的行為。非純函數會導致各種意想不到的后果,并且很難排查原因。曾經遇到過一個奇葩的偶現問題,就是由于非純函數導致的,后面花費了很長時間才定位到問題,詳見:《做純潔的人,寫純潔的代碼》一文中的血淋淋的例子。
【資料圖】
雖然總是呼吁大家寫純函數,但現實總是會接觸到不純的函數代碼。上文中提到的例子,最后的修復辦法也是改寫了不純的部分代碼,讓原來的不純的函數成為一個純函數。那個問題雖然難以重現和排查,但好在修改起來的代碼量非常小,改寫是容易的。除這種情況之外,本文又提供了一個案例,不僅排查困難,而且改寫也麻煩。在這種情況下,又提供了另一種方式,只需要在不純的函數頭部添加一個冪等修飾符,就讓它再次純潔起來,這樣完全可以不用改寫。
一般我們說冪等,是從系統外部視角,針對的是用戶做的某些操作,映射為后端服務中的某些 API。比如我們常說某某接口是冪等的,特別是 GET 請求的接口,只要輸入參數一樣,返回結果永遠一樣。本文實現的冪等修飾符,是方法級別的,粒度更細。當然,如果這個方法本來就是純函數,自然不需要這個冪等修飾符了。如果某個方法有副作用,它就派上了用場,可以在不改變方法實現的前提下,讓它的行為和純函數一樣,完全不需要了解函數到底做了啥。
本文將從實際問題引入,并以 TDD (非教條主義的)的開發方式,以及漸進增強的思想,逐步實現這個“冪等修飾符”。具體編程語言是該實際項目用到的 TypeScript。
非純函數惹的禍但是,如果維護一個屎山項目時,你不要期待迎接你的全是純函數。我最近遇到一個情況,對于系統中的消息通知呀,有時候有用戶反映,一模一樣的通知,會收到兩條甚至更多條。
我看了一下代碼,發送消息是由一個長長的函數進行處理的,而這個函數有很多觸發路徑,比如接口觸發、定時觸發、消息隊列觸發等等,所以要從源頭找出原因并不容易。
可以看到,它的輸入參數是一個名為 post 的 PostRecord 對象,好在它有一個的 postId 鍵值,同一個消息推送通知,擁有一個唯一的 postId 鍵值:
export class PostRecord {...@Expose()postId: string...}
它的問題在于,對于同一個 postId 的消息推送通知對象,被不同的調用路徑分別處理了多次,所以很自然的想法是,使用 postId 作為緩存鍵,將發送結果(布爾值)作為緩存值,將第一次執行的結果保存下來,后面的執行直接短路返回第一次執行的結果值。而且,通過修飾符來添加這個功能,似乎是最優雅的,因為對原有代碼沒有任何修改,只在方法頭部增加一行而已:
+ @idempotent()public async handlePost(post: PostRecord): Promise{...版本一:一個幼稚的實現
只考慮一個服務實例的情況,也就是一個進程。那么只需要使用一個內存變量來做這個緩存存儲就行了。
先寫測試有了這個想法,為了文檔、以及保證后面的擴展過程順利,先寫測試來構建安全屏障。盡管測試也是一個一個實現的,但為了行文不啰嗦,這里直接貼出主要測試用例代碼:
import { idempotent } from "@/utils/idempotent"let count = 0class TestingClass {@idempotent()testMethod() {console.log("adding count = ", count)return count++}}describe("idempotent", () => {it("a function without idempotent annotation would be called multiple times", async () => {let c = 0const testFunction = jest.fn().mockImplementation(() => c++)testFunction()testFunction()expect(testFunction).toHaveBeenCalledTimes(2)expect(c).toEqual(2)})it("a function with idempotent annotation would only be called once", async () => {const sut = new TestingClass()sut.testMethod()sut.testMethod()expect(count).toEqual(1)})})
其實,主要的測試意圖就是,多次調用一個方法,如果該方法沒有冪等修飾符,那么該方法帶來的影響是多次執行;而如果這個方法有冪等修飾符呢,其效果是只有第一次是真正執行了,后續的執行被短路了。于是寫了這樣的一個幼稚實現版本:
實現const cache = {}export function idempotent() {console.log("making ", " idempotent")return function (target, propertyKey, descriptor) {console.log("idempotent called: ", target, propertyKey, descriptor)console.log("target.propertyKey = ", target[propertyKey].toString())const originalMethod = descriptor.valuedescriptor.value = () => {console.log("cache = ", cache)if (typeof cache[propertyKey] === "undefined") {cache[propertyKey] = originalMethod()}return cache[propertyKey]}console.log("target.propertyKey now = ", target[propertyKey].toString())}}
提交代碼。
增強版本然后再回頭審視代碼,這個緩存鍵用了方法名,但沒有類信息,會導致不同類的同一方法名,出現混淆情況。我們將類的信息也編碼到緩存鍵里去:
const cache = {}export function idempotent() {- console.log("making ", " idempotent")return function (target, propertyKey, descriptor) {- console.log("idempotent called: ", target, propertyKey, descriptor)- console.log("target.propertyKey = ", target[propertyKey].toString())-+ const cacheKey = `${target.constructor}.${propertyKey}`const originalMethod = descriptor.valuedescriptor.value = () => {- console.log("cache = ", cache)- if (typeof cache[propertyKey] === "undefined") {- cache[propertyKey] = originalMethod()+ if (typeof cache[cacheKey] === "undefined") {+ cache[cacheKey] = originalMethod()}- return cache[propertyKey]+ return cache[cacheKey]}-- console.log("target.propertyKey now = ", target[propertyKey].toString())}}
測試通過,提交代碼。
再次審視,我們需要將對象的信息編碼進入緩存鍵中,不然,同一個類下的不同對象之間也會出現混淆,這是一個后面的優化點。
繼續增強——支持參數以上的實現版本,冪等裝飾器是一個不帶參數的函數。這次再增強一下,允許傳入一個函數作為冪等裝飾器的參數,該函數接收裝飾目標方法的參數為參數,并返回一個鍵值,成為緩存鍵的一部分。整個過程就不啰嗦了,為了測試這些場景,新的測試文件內容如下:
import { idempotent } from "@/utils/idempotent/idempotent"describe("idempotent", () => {describe("idempotent without key", () => {let count = 0class TestingClass {@idempotent()async testMethod() {return count++}}it("a function without idempotent annotation would be called multiple times", async () => {let c = 0const testFunction = jest.fn().mockImplementation(() => c++)testFunction()testFunction()expect(testFunction).toHaveBeenCalledTimes(2)expect(c).toEqual(2)})it("a function with idempotent annotation would only be called once", async () => {const sut = new TestingClass()await Promise.all([sut.testMethod(), sut.testMethod()])expect(count).toEqual(1)})})describe("idempotent with key", () => {class TestingClass {@idempotent((obj) => obj.id)testMethod(obj: { id: string; count: number }) {obj.count++return obj.count}}it("calls testMethod multiple times, only the 1st one takes effect", async () => {const sut = new TestingClass()const obj1 = { id: "1", count: 0 }const obj2 = { id: "2", count: 0 }sut.testMethod(obj1)sut.testMethod(obj1)sut.testMethod(obj2)expect(obj1.count).toEqual(1)expect(obj2.count).toEqual(1)})})})
其實,主要的測試意圖就是,多次調用一個方法,如果該方法沒有冪等修飾符,那么該方法帶來的影響是多次執行;而如果這個方法有冪等修飾符呢,其效果是只有第一次是真正執行了,后續的執行被短路了。然后,分別考慮這個方法接收參數與不接收參數的場景,不接收參數,該方法至多只會被執行一次;接收參數,對“同樣的”參數至多只執行一次。但是這個“同樣”的涵義,是在寫修飾符時定義的。也就是說,這個修飾符自己也接受一個參數,用來定義這個“同樣”。比如根據參數的某個唯一屬性決定,或者自行實現一個哈希值進行比對,都可以。
滿足這樣的測試的裝飾器實現如下:
import * as crypto from "crypto"import { sleep } from "@/common"const inMemoryStorage = {executedMethods: {},returnValues: {},}export enum MethodStatus {pending = 0,done = 1,error = 2,}export interface IIdempotentStorage{get: (hash: string) => PromisesaveReturnValuesIfNotExecuted: (hash: string, valueEvaluator: () => Promise) => Promise}export class HashDuplicationError extends Error {}export class OriginalMethodError extends Error {constructor(readonly originalError: Error) {super()}}export class InMemoryIdempotentStorageimplements IIdempotentStorage{async saveReturnValuesIfNotExecuted(hash: string, valueEvaluator: () => Promise) {if (inMemoryStorage.executedMethods[hash] === undefined) {inMemoryStorage.executedMethods[hash] = MethodStatus.pendingtry {inMemoryStorage.returnValues[hash] = await valueEvaluator()} catch (ex) {inMemoryStorage.executedMethods[hash] = MethodStatus.errorinMemoryStorage.returnValues[hash] = exthrow new OriginalMethodError(ex)}inMemoryStorage.executedMethods[hash] = MethodStatus.done}}async get(hash) {if (inMemoryStorage.executedMethods[hash] === MethodStatus.error) {throw new OriginalMethodError(inMemoryStorage.returnValues[hash])}if (inMemoryStorage.executedMethods[hash] !== MethodStatus.done) {await sleep(500)return await this.get(hash)}return inMemoryStorage.returnValues[hash]}}export function idempotent(hashFunc?, idempotentStorage: IIdempotentStorage= new InMemoryIdempotentStorage()) {return function (target, propertyKey, descriptor) {const cachePrefix = `${crypto.createHash("md5").update(target.constructor.toString()).digest("hex")}.${propertyKey}`const originalMethod = descriptor.valuedescriptor.value = async function (...args) {const hash = hashFunc ? hashFunc(...args) : ""const cacheKey = `${cachePrefix}:${hash}`const fallback = async () => await originalMethod.call(this, ...args)const idempotentOrFallback = async () =>await Promise.race([idempotentStorage.get(cacheKey),new Promise((resolve, reject) => setTimeout(reject, 30000)),]).catch(fallback)try {await idempotentStorage.saveReturnValuesIfNotExecuted(cacheKey, fallback)} catch (ex) {// if it"s duplicate error, wait for done and then getif (ex instanceof HashDuplicationError) {return await idempotentOrFallback()} else if (ex instanceof OriginalMethodError) {throw ex.originalError} else {console.error(ex)return await fallback()}}return await idempotentOrFallback()}}}
這中間跳躍比較大了,實際情況并不是一步到位的。有一個改動比較明顯,即裝飾器函數變復雜了。其中 descriptor.value 不再用箭頭表達式,而是用了 function。這是為了利用 this,保證如果目標方法依賴類中的其他屬性或者成員,在被裝飾器改寫后,仍然可以照常使用,而不會報 某某方法、成員、或者屬性在 undefined 中不存在等之類的錯誤。
還可以看到,內存緩存不再是一個對象,而是包括了兩個對象的對象。這是由于考慮到異步函數,存在方法正在執行但是返回結果還沒有拿到的情況,所以增加了executedMethods做為了一個互斥鎖。并且將裝飾器的依賴,顯式使用接口 IIdempotentStorage 說明,不再隱式依賴內存緩存。同時,使用 class 方式實現了使用內存緩存的冪等存儲接口 IIdempotentStorage 。
這個接口設計上只有兩個方法,即 saveReturnValuesIfNotExecuted 和 get。get 顯然就是用來獲取緩存值的,并且保證獲取到值,如果裝飾目標函數正在運行,值還沒有拿到,這個 get 不會返回,也不會返回空,而是會等待一段時間,再去獲取,直到獲取到值。若因為某種原因一直拿不到返回值,最終這個裝飾目標方法會報超時錯誤,這個邏輯見裝飾器的代碼。
這個接口的另一方法,叫 saveReturnValuesIfNotExecuted,會被裝飾后的方法首先執行。這個方法名很長,正如這個名字暗示的,只會在第一次執行原方法時保存返回值到存儲中。這個方法,在執行原方法時,需要先檢查是不是已經有一個實例在執行中了,即需要先拿到一個互斥鎖。所以會在 InMemoryIdempotentStorage看到對之前說的 executedMethods進行檢查。由于是內存存儲,通過這個鎖來防止對原方法的重復調用是簡單且有效的,在后面增加非內存存儲時,就需要利用別的機制了,會更復雜一些。
版本二:支持多個服務實例在版本一實現后,非常清楚這個方式解決問題的關鍵在于需要一個緩存存儲。內存版本只能支持一個服務實例,要支持多個服務實例,我們必須找到一個外部存儲,這個外部存儲可以是 Redis、也可以是其他數據庫。本文采用了 DynamoDb 作為緩存存儲,因為該項目已經引用了 AWS 的 DynamoDb,并且沒有使用 Redis,所以繼續沿用不需要增加依賴。如果使用 Redis 的話,可以考慮使用 RedLock 這個庫,它應該是利用了 Redis 的分布式鎖功能。據說 DynamoDb 也有分布式鎖方案,但是本文沒有采用分布式鎖,而且利用了數據庫的唯一約束,完成了冪等功能,詳見后面的敘述。
測試先行既然決定了使用 DynamoDb,那么有個挑戰就是如果在測試時,排除 DynamoDb 這個外部依賴。好在之前有文章《掃清 Cloud Native 開發時的 TDD 障礙》已經部分解決了這個挑戰,即通過 Mockery 將 AWS SDK 中的方法用 jest.fn() 去模擬掉了,但是,本篇文章的需求,要使用更多的 AWS SDK 中的方法,所以需要在那篇文章的基礎上,增加更多的模擬。
主要是,在實現接口的 saveReturnValuesIfNotExecuted方法時,需要利用數據庫的唯一約束,在多次寫入同一鍵值時,能夠讓數據庫報錯。這里使用了 AWS DynamoDb 的 transactWriteItems方法,在測試中,需要將它模擬掉:
export const mockDynamoDB = {+ transactWriteItems: jest.fn().mockImplementation((params: DynamoDB.Types.TransactWriteItemsInput) => {+ return {+ promise: () => {+ const hash = params.TransactItems[0].Put?.Item["hash"].S++ if (!hash) {+ return Promise.reject("hash empty!")+ }++ if (!db[hash]) {+ db[hash] = params.TransactItems[0].Put?.Item+ return Promise.resolve()+ } else {+ return Promise.reject("duplicated!")+ }+ },+ }+ }),+ describeTable: jest.fn().mockImplementation(({ TableName }) => {+ return {+ promise: () => {+ return Promise.resolve({ TableName })+ },+ }+ }),createTable: jest.fn().mockImplementation(() => {return {
打通了這個自動化測試障礙,就可以寫測試用例了。主要的測試目的,就是驗證當我們實現了基于 DynamoDb 的冪等存儲后,如果嘗試多次調用 saveReturnValuesIfNotExecuted方法,只有第一次的調用能夠成功,而重復的調用應該拋錯,并且 get只會取到第一次存儲的值。
import { mockAwsSdk } from "../../../test/mocks/aws"jest.mock("aws-sdk", () => mockAwsSdk)import { DynamodbIdempotentStorage } from "@/utils/idempotent/dynamodb.idempotent.storage"import { AWSAdapterService } from "@/common/adapters/aws"import { HashDuplicationError } from "@/utils/idempotent/idempotent"describe("dynamodb.idempotent.storage", () => {it("throws HashDuplicationError when saving duplicate hash record", async () => {const dynamodbStorage = new DynamodbIdempotentStorage(new AWSAdapterService())await dynamodbStorage.saveReturnValuesIfNotExecuted("1234", async () => {return "hello"})await expect(async () => {await dynamodbStorage.saveReturnValuesIfNotExecuted("1234", async () => {return "world2"})}).rejects.toThrow(HashDuplicationError)const res = await dynamodbStorage.get("1234")expect(res).toStrictEqual("hello")})})基于 DynamoDb 的冪等存儲
也不啰嗦,最后的實現大致是這樣的:
import {HashDuplicationError,IIdempotentStorage,MethodStatus,OriginalMethodError,} from "@/utils/idempotent/idempotent"import { BaseDynamoService } from "@/common/base"import { Expose, plainToClass } from "class-transformer"import { DynamoDB } from "aws-sdk"import { sleep } from "@/common"export class IdempotentCache {@Expose()hash: string@Expose()status: MethodStatus@Expose()returnValue: string@Expose()ttl: number}const getTtl = () => Math.floor(new Date().getTime() / 1000 + 3600 * 24)export class DynamodbIdempotentStorageextends BaseDynamoServiceimplements IIdempotentStorage{async get(hash: string): Promise{const record = await this.getItem({TableName: this.table,Key: {hash: { S: hash },},})if (record && record.status.toString() === MethodStatus.error.toString()) {throw new OriginalMethodError(new Error(record.returnValue))}if (!record || record.status.toString() !== MethodStatus.done.toString()) {console.log("record of ", hash, " = ", record)await sleep(500)return await this.get(hash)}return record?.returnValue ? JSON.parse(record?.returnValue) : undefined}async saveReturnValuesIfNotExecuted(hash: string, valueEvaluator: () => Promise): Promise{await this.ensureTable(this.table)try {await this.transactionalWriteItems({TransactItems: [{Put: {TableName: this.table,ConditionExpression: "attribute_not_exists(#hsh)",ExpressionAttributeNames: { "#hsh": "hash" },Item: this.toAttributeMap({hash,status: MethodStatus.pending,returnValue: "",ttl: getTtl(),}),},},],})} catch (ex) {console.error(ex)throw new HashDuplicationError(ex.message)}let returnValuetry {returnValue = await valueEvaluator()} catch (ex) {const item = this.toAttributeMap({hash,status: MethodStatus.error,returnValue: ex.message,ttl: getTtl(),})await this.putItem({TableName: this.table,Item: item,})throw new OriginalMethodError(ex)}const item = this.toAttributeMap({hash,status: MethodStatus.done,returnValue: JSON.stringify(returnValue),ttl: getTtl(),})await this.putItem({TableName: this.table,Item: item,})}protected getTableConfig(): Partial{return {TableName: this.table,AttributeDefinitions: [{AttributeName: "hash",AttributeType: "S",},],KeySchema: [{AttributeName: "hash",KeyType: "HASH",},],}}protected toAttributeMap(record: IdempotentCache): DynamoDB.AttributeMap {return {hash: this.toS(record.hash),status: this.toS(record.status),returnValue: this.toS(record.returnValue),ttl: this.toN(record.ttl),}}protected toInstance(item: DynamoDB.AttributeMap): IdempotentCache {return plainToClass(IdempotentCache, {hash: this.toValue(item.hash),status: this.toValue(item.status),returnValue: this.toValue(item.returnValue),ttl: this.toValue(item.ttl),})}protected getTTLConfig(): Partial| null {return {TimeToLiveSpecification: {Enabled: true,AttributeName: "ttl",},}}}
這個實現,依賴了一個 BaseDynamoService,關于它的更多信息,見之前的《強行在 TypeScript 里應用 C# 的 partial class》,其實就是對 Aws Sdk 中的 DynamoDb 做了一些封裝。
另外,利用了 DynamoDb 的 ttl 機制,只緩存一天的數據。
關于前面和這里反復用到的 sleep 函數,相信你分分鐘就能寫一個吧,不做贅述。
測試通過,提交代碼。
在真實的 AWS 環境里測試盡管測試愉快地通過了,但那都是基于我們模擬的環境。如果放在真實的 AWS 環境,它真的會如期工作嗎?真的會!
這里分享一下從本地連接真實的 AWS 環境進行測試的技巧。首先,需要安裝 aws 命令行,登錄之后,你可以 cat ~/.aws/config 看到一些關鍵信息,如:
[default]aws_access_key_id = xxxxaws_secret_access_key = yyyaws_session_token = zzzoutput = jsonregion = cn-northwest-1
要在跑測試時,通過以上信息連接到真實的 AWS 環境,需要將環境變量 AWS_SDK_LOAD_CONFIGx0;設置為 1,于是要么:
AWS_SDK_LOAD_CONFIG=1yarntest
要么,在測試文件頂部加入:
process.env["AWS_SDK_LOAD_CONFIG"]="1"
并且刪除之前的對 mock 的執行:
+ process.env["AWS_SDK_LOAD_CONFIG"] = "1"- import { mockAwsSdk } from "../../../test/mocks/aws"- jest.mock("aws-sdk", () => mockAwsSdk)ADFS?
如果你的 AWS 登錄采用了 adfs,那么推薦使用 https://github.com/Jeff-Tian/aws-adfs-auth ,讓你可以在命令行里直接登錄 AWS。詳見使用教程見其 README。
使用基于 DynamoDb 的冪等存儲到了這一步,我們已經走了很遠?,F在回頭來解決最初的問題。在簡要分析與修復想法中,我們希望只通過添加一個冪等修飾符,不改其他代碼,就修復掉重復發消息的問題。于是最終的代碼改動如下:
+ @idempotent((post) => post.postId, new DynamodbIdempotentStorage())public async handlePost(post: PostRecord): Promise{
上線后,再也沒有出現用戶收到重復消息的問題了。
思考與總結為什么沒有使用 memoize 模式?memoize 是我很喜歡的一個模式,它不僅也讓不純的函數變得純潔,而且實現起來非常簡潔。在之前的文章中我一再提到它:
閉包的妙用 —— memoize
閉包的妙用,再次以 memoize 舉例
屢試不爽的 memoize
但是這次沒有采用,因為它也是利用內存做為緩存,更適合只有一個實例的場景,比如用在前端就很好。但是基于要用到數據庫的原因,就沒有采用它。
發布 npm 包如果后面再發現其它項目中也需要用它,或者本文點贊數過千,那說明這個裝飾器很有復用價值,到那時再發布成一個 npm 包。
相關閱讀
-
微速訊:冪等 —— 讓函數再次純潔
冪等——讓函數再次純潔純潔地做人,是一種美德。寫純函數,是程序... -
【第28題】console的異步性怎么理解?
面試題目(字節):console的異步性怎么理解?答案解析:console我一... -
當前快播:“趣味運動會項目”教學思路
說在前面浙教版《選擇性必修一數據和數據結構》第一章是全書的導論... -
環球實時:“數據合并”教學思路
說在前面浙教版《選擇性必修一數據和數據結構》第一章是全書的導論... -
環球速訊:因為一篇文章,發生了一大堆...
前段時間寫了一篇文章,給ShardingSphere提了個PR,不知道是不是嫌... -
快報:用 UE 虛幻引擎做個捏臉小功能~~
最近在學習UE相關的使用,正好看到一篇文章講解用ControlRig實現簡...