Skip to content

Commit

Permalink
backend:read-from-master-db-replica-failover
Browse files Browse the repository at this point in the history
  • Loading branch information
vijaygupta18 committed Sep 18, 2024
1 parent 278b677 commit 15e04c2
Showing 1 changed file with 39 additions and 18 deletions.
57 changes: 39 additions & 18 deletions lib/mobility-core/src/Kernel/Beam/Functions.hs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import Database.Beam hiding (timestamp)
import Database.Beam.MySQL ()
import Database.Beam.Postgres
import qualified EulerHS.KVConnector.Flow as KV
import EulerHS.KVConnector.Types (DBCommandVersion (..), KVConnector (..), MeshConfig (..), MeshMeta, TableMappings)
import EulerHS.KVConnector.Types (DBCommandVersion (..), KVConnector (..), MeshConfig (..), MeshError (MDBError), MeshMeta, MeshResult, TableMappings)
import EulerHS.KVConnector.Utils
import qualified EulerHS.Language as L
import EulerHS.Types hiding (Log, V1)
Expand Down Expand Up @@ -190,6 +190,13 @@ getReplicaLocationDbConfig = do
Just dbCnf' -> pure dbCnf'
Nothing -> L.throwException $ InternalError "Replica LocationDB Config not found"

getReadDBConfigInternal :: (HasCallStack, L.MonadFlow m) => Text -> m (DBConfig Pg)
getReadDBConfigInternal modelName = do
tables <- L.getOption KBT.Tables
let dbConfig = maybe getReplicaDbConfig (\tables' -> if modelName `elem` tables'.readFromMasterDb then getMasterDBConfig else getReplicaDbConfig) tables
isMasterReadEnabled <- L.getOptionLocal MasterReadEnabled
maybe dbConfig (\isMasterReadEnabled' -> if isMasterReadEnabled' then getMasterDBConfig else getReplicaDbConfig) isMasterReadEnabled

type BeamTableFlow table m =
( HasCallStack,
BeamTable table,
Expand All @@ -207,6 +214,22 @@ type BeamTable table =
Show (table Identity)
)

--- db failover function
runWithMasterDbIfReplicaFails ::
forall m a.
L.MonadFlow m =>
m (MeshResult a) ->
m (MeshResult a)
runWithMasterDbIfReplicaFails action = do
res <- action
case res of
Left (MDBError (DBError errType _)) | errType `elem` [ConnectionFailed, ConnectionDoesNotExist] -> do
L.setOptionLocal MasterReadEnabled True
result <- action
L.setOptionLocal MasterReadEnabled False
pure result
_ -> pure res

-- findOne --

findOneWithKV ::
Expand Down Expand Up @@ -310,8 +333,9 @@ findAllWithKVAndConditionalDB ::
m [a]
findAllWithKVAndConditionalDB where' orderBy = do
updatedMeshConfig <- setMeshConfig (modelTableName @table) (modelSchemaName @table) meshConfig
dbConf' <- getReadDBConfigInternal (modelTableName @table)
result <- KV.findAllWithKVAndConditionalDBInternal dbConf' updatedMeshConfig where' orderBy
result <- runWithMasterDbIfReplicaFails $ do
dbConf' <- getReadDBConfigInternal (modelTableName @table)
KV.findAllWithKVAndConditionalDBInternal dbConf' updatedMeshConfig where' orderBy
case result of
Right res -> do
res' <- mapM fromTType' res
Expand Down Expand Up @@ -348,8 +372,9 @@ findAllWithOptionsKV' ::
m [a]
findAllWithOptionsKV' where' mbLimit mbOffset = do
updatedMeshConfig <- setMeshConfig (modelTableName @table) (modelSchemaName @table) meshConfig
dbConf' <- getReadDBConfigInternal (modelTableName @table)
result <- KV.findAllWithOptionsKVConnector' dbConf' updatedMeshConfig where' mbLimit mbOffset
result <- runWithMasterDbIfReplicaFails $ do
dbConf' <- getReadDBConfigInternal (modelTableName @table)
KV.findAllWithOptionsKVConnector' dbConf' updatedMeshConfig where' mbLimit mbOffset
case result of
Right res -> do
res' <- mapM fromTType' res
Expand Down Expand Up @@ -466,8 +491,9 @@ findOneInternal ::
Where Postgres table ->
m (Maybe a)
findOneInternal updatedMeshConfig fromTType where' = do
dbConf' <- getReadDBConfigInternal (modelTableName @table)
result <- KV.findWithKVConnector dbConf' updatedMeshConfig where'
result <- runWithMasterDbIfReplicaFails $ do
dbConf' <- getReadDBConfigInternal (modelTableName @table)
KV.findWithKVConnector dbConf' updatedMeshConfig where'
logQueryData "findOneInternal" (show $ getFieldsAndValuesFromClause meshModelTableEntityDescriptor (And where')) ("Nothing" :: Text) (show result) (meshEnabled updatedMeshConfig) (modelTableName @table)
case result of
Right (Just res) -> fromTType res
Expand All @@ -482,8 +508,9 @@ findAllInternal ::
Where Postgres table ->
m [a]
findAllInternal updatedMeshConfig fromTType where' = do
dbConf' <- getReadDBConfigInternal (modelTableName @table)
result <- KV.findAllWithKVConnector dbConf' updatedMeshConfig where'
result <- runWithMasterDbIfReplicaFails $ do
dbConf' <- getReadDBConfigInternal (modelTableName @table)
KV.findAllWithKVConnector dbConf' updatedMeshConfig where'
logQueryData "findAllInternal" (show $ getFieldsAndValuesFromClause meshModelTableEntityDescriptor (And where')) ("Nothing" :: Text) (show result) (meshEnabled updatedMeshConfig) (modelTableName @table)
case result of
Right res -> do
Expand All @@ -502,22 +529,16 @@ findAllWithOptionsInternal ::
Maybe Int ->
m [a]
findAllWithOptionsInternal updatedMeshConfig fromTType where' orderBy mbLimit mbOffset = do
dbConf' <- getReadDBConfigInternal (modelTableName @table)
result <- KV.findAllWithOptionsKVConnector dbConf' updatedMeshConfig where' orderBy mbLimit mbOffset
result <- runWithMasterDbIfReplicaFails $ do
dbConf' <- getReadDBConfigInternal (modelTableName @table)
KV.findAllWithOptionsKVConnector dbConf' updatedMeshConfig where' orderBy mbLimit mbOffset
logQueryData "findAllWithOptionsInternal" (show $ getFieldsAndValuesFromClause meshModelTableEntityDescriptor (And where')) ("Nothing" :: Text) (show result) (meshEnabled updatedMeshConfig) (modelTableName @table)
case result of
Right res -> do
res' <- mapM fromTType res
pure $ catMaybes res'
Left err -> throwError $ InternalError $ show err

getReadDBConfigInternal :: (HasCallStack, L.MonadFlow m) => Text -> m (DBConfig Pg)
getReadDBConfigInternal modelName = do
tables <- L.getOption KBT.Tables
let dbConfig = maybe getReplicaDbConfig (\tables' -> if modelName `elem` tables'.readFromMasterDb then getMasterDBConfig else getReplicaDbConfig) tables
isMasterReadEnabled <- L.getOptionLocal MasterReadEnabled
maybe dbConfig (\isMasterReadEnabled' -> if isMasterReadEnabled' then getMasterDBConfig else getReplicaDbConfig) isMasterReadEnabled

updateInternal ::
forall table m r.
(BeamTableFlow table m, EsqDBFlow m r) =>
Expand Down

0 comments on commit 15e04c2

Please sign in to comment.