Skip to content
Commits on Source (8)
#!/bin/sh
set -e
stack build --test
stack exec hlint -- -g || true
module RD.CliVersion (cliVersion) where
cliVersion :: String
cliVersion = "1.5.0.1"
cliVersion = "1.6.0.0"
......@@ -19,8 +19,12 @@ import RD.Types
-- | convert byte number to MiB. small number will become 0.
humanReadableSize :: Integer -> T.Text
humanReadableSize bytes = sformat (fixed 1 % " MiB")
(fromInteger bytes / (1048576 :: Double))
humanReadableSize bytes =
let bytesDouble = fromInteger bytes / (1048576 :: Double) in
if bytesDouble < 1.0 then
"<1.0 MiB"
else
sformat (fixed 1 % " MiB") bytesDouble
-- | get sha1sum hex string for given bytes
sha1sumOnBytes :: LB.ByteString -> LB.ByteString
......
......@@ -31,6 +31,7 @@ instance Show FileStatus where
show FileStatusDone = "done"
show FileStatusUnknown = "unknown"
-- | convert FileStatus to ByteString
fsBytes :: FileStatus -> B.ByteString
fsBytes = Char8.pack . show
......
* COMMENT -*- mode: org -*-
#+Date: 2018-05-04
Time-stamp: <2024-04-25>
Time-stamp: <2024-04-26>
#+STARTUP: content
* notes :entry:
** 2022-03-14 project dir structure
......@@ -29,6 +29,7 @@ Time-stamp: <2024-04-25>
./README.md
./pypi/rd-api/README.rst
./pypi/rd-client/README.rst
- build wheel and test it in production server
make dist -C pypi
......@@ -168,6 +169,8 @@ this should return json of the block metadata.
set <filepath>_blockSize_status working|done
set <filepath>_sha1 SHA1
- worker pool is there for calculating all blocks for one file.
fileQueue
......@@ -492,6 +495,130 @@ via env var and command line parameter.
* current :entry:
**
** 2024-03-12 rd-api, if file is already transferred block by block, I can support
live compress easily. If the client request compress as param such as
?compress=zstd. default is no compress.
- when the source file is just tar, not compressed format like squashfs or
zip, this can speed up transfer by delaying compress.
** 2024-03-12 rd, add an DL remaining time estimate, based on estimated DL speed.
each block has a block size. I know when it is started and when it is
finished. I know how many more blocks to fetch. it should be easy to
estimate. calculate a moving avg speed using the last 5 blocks DLed.
** 2018-05-09 test the app under unstable network.
I remember there are tools that can simulate packet loss.
policy in ovs can do it.
- 2018-05-11 tcp - Simulate delayed and dropped packets on Linux - Stack Overflow
https://stackoverflow.com/questions/614795/simulate-delayed-and-dropped-packets-on-linux
test this in vbox VM. stretch01
see stretch01 daylog.
* done :entry:
** 2024-04-26 log refine.
- DONE 2024-04-26T19:06:17 I fileWorker done for /home/sylecn/d/t2/foo, 0.0 MiB, 1 blocks
don't use 0.0MiB, just say <1MiB or so.
humanReadableSize
add unit test for this.
- DONE 2024-04-26T19:12:02 I fileWorker is waiting for jobs...
2024-04-26T19:12:02 I fileWorker is waiting for jobs...
fileWorker-<N> is waiting for jobs
2024-04-26T19:12:02 I fileWorker working on /home/sylecn/d/t2/foo
2024-04-26T19:12:02 I fileWorker done for /home/sylecn/d/t2/foo, 0.0 MiB, 1 blocks
fileWorker-<N> working on ...
in startWorkers, give it a name.
-
** 2024-04-07 rd-api: when a file content is changed on disk, auto invalidate all
cached blocks.
- when user request rd metadata, and file's mtime changed, do a sha1sum on
first 4M and last 4M of the file. if any of these sha1sum changes,
invalidate cache in redis.
when calculate metadata, cache file's mtime, first 4M and last 4M of the
file's content's sha1sum.
2024-04-25 is it easy to get last 4M? yes.
in C, should be seek to length-4M byte and read 4M.
from correctness point of view, this is not enough. user could modify the
center part of the file. How about just always recalculate if mtime changed?
how about this? just do it for small files, use full content hash as
fingerprint. For large files, user is supposed to rename it or clear file
metadata cache. If so, user can always rename the file before run rd again.
just give a warning will be enough.
design:
auto calculate and check fingerprint for small files (<200M), skip
fingerprint check for large files.
- this will allow DL the correct file when file content for the same file name
is changed.
- give some log in console when file content changes.
*** auto calculate and check fingerprint for small files (<200M),
skip fingerprint check for large files.
- implement this now.
this change should be backward compatible to older rd clients.
it only affect server side metadata caching.
- code in getRdHandler
resultE <- liftIO $ runExceptT $ processNewFileAsyncMaybe rc fbp
processNewFileAsyncMaybe :: RDRuntimeConfig -> FillBlockParam -> ExceptT T.Text IO ()
resultE <- liftIO $ DB.insertIfNotExist rc strKey $ fsBytes FileStatusWorking
throwOnLeft resultE
here, if files' sha1 has changed, use DB.set instead of DB.insertIfNotExist.
- add proper logging.
- create a unit test for this?
it's all IO actions, maybe I will test it myself.
on agem10,
stack exec -- rd-api -d ~/d/t2/
create ~/d/t2/foo with some content
stack exec -- rd http://127.0.0.1:8082/foo
- problems
- if I send job to worker. will old redis key be rewritten?
will it ever return expired cache data?
it will call this:
blocksWithSha1sum <- liftIO $ fillSha1sum rc fbp
I need to delete the old redis key.
- test the new code.
when foo content changed.
it works.
** 2024-04-25 haskell: when only executable code changes, should I in increase version in package.yaml
I think I will defer version change to avoid re-compilation of the base library.
** 2024-04-07 when combining blocks to create final file, don't print
"No block fetched in last 10 seconds" log any more if there is no other files
in DL list.
** 2024-04-24 try build this project in gocd.
- build in debian stretch agent node.
- build in docker is fine.
......@@ -756,51 +883,28 @@ set.AGENT_STARTUP_ARGS=-Xms256m -Xmx1024m
I see. in Makefile the path is not build/rd-api.
it's ../../build/rd-api
** 2024-04-07 when combining blocks to create final file, don't print
"No block fetched in last 10 seconds" log any more if there is no other files
in DL list.
** 2024-04-07 rd-api: when a file content is changed on disk, auto invalidate all
cached blocks.
- when user request rd metadata, and file's mtime changed, do a sha1sum on
first 4M and last 4M of the file. if any of these sha1sum changes,
invalidate cache in redis.
when calculate metadata, cache file's mtime, first 4M and last 4M of the
file's content's sha1sum.
- this will allow DL the correct file when file content for the same file name
is changed.
- give some log in console when file content changes.
-
** 2024-03-12 rd-api, if file is already transferred block by block, I can support
live compress easily. If the client request compress as param such as
?compress=zstd. default is no compress.
- in setup.py python3-setuptools is required.
- when the source file is just tar, not compressed format like squashfs or
zip, this can speed up transfer by delaying compress.
I think I should have a python3 build image and run the package commands
in that docker image instead of rebuilding go-agent image too frequently.
** 2024-03-12 rd, add an DL remaining time estimate, based on estimated DL speed.
each block has a block size. I know when it is started and when it is
finished. I know how many more blocks to fetch. it should be easy to
estimate. calculate a moving avg speed using the last 5 blocks DLed.
use pre-built twine docker image maybe.
** 2022-03-15 rd client, is there a built-in repeat/loop function?
IO () -> IO ()
I really hate docker-in-docker though.
I should not need to write showProgressLoop explicitly.
maybe just don't run go-agent in docker.
that will fix everything.
** 2018-05-09 test the app under unstable network.
I remember there are tools that can simulate packet loss.
policy in ovs can do it.
I can manage OS env using salt easily.
but I also don't want to install compilers in production system.
maybe run it in VMs?
- 2018-05-11 tcp - Simulate delayed and dropped packets on Linux - Stack Overflow
https://stackoverflow.com/questions/614795/simulate-delayed-and-dropped-packets-on-linux
recreate docker image is not slow when change is not big.
give it another try.
test this in vbox VM. stretch01
see stretch01 daylog.
it works now.
on next release, I can use gocd to build and deploy to PyPI.
* done :entry:
** 2019-02-28 bug: rd-api -d java/
option -d: cannot parse value `java/'
......@@ -2577,6 +2681,13 @@ I can use haskell fork though.
yes. see Route Patterns.
* wontfix :entry:
** 2022-03-15 rd client, is there a built-in repeat/loop function?
IO () -> IO ()
I should not need to write showProgressLoop explicitly.
- 2024-04-26 I think recursion is good.
** 2024-04-24 try build this project using nix with flake.
- create flake.nix.
https://www.tweag.io/blog/2020-05-25-flakes/
......
name: reliable-download
# do not modify version if lib/ code is not changed.
# real app version: 1.3.2.0
version: 1.2.6.0
synopsis: provide reliable download service via HTTP
description: reliable-download web application and cli tool
......
......@@ -97,6 +97,11 @@ https://gitlab.emacsos.com/sylecn/reliable-download
ChangeLog
---------
* v1.6.0.0 2024-04-26
- feature: auto detect file content changes for small files. auto invalidate metadata cache if file has changed.
- feature: refine fileWorker logs
* v1.5.0.0 2024-04-08
- bugfix: properly handle unicode string in URL path
......
......@@ -72,6 +72,10 @@ https://gitlab.emacsos.com/sylecn/reliable-download
ChangeLog
---------
* v1.6.0.0 2024-04-26
- bugfix: don't print no block fetched in last N seconds when all blocks are fetched.
* v1.5.0.0 2024-04-08
- bugfix: properly handle unicode string in URL path
......
......@@ -8,6 +8,8 @@ import Control.Concurrent.Chan
import System.IO.Error (catchIOError)
import Control.Monad (unless)
import qualified Data.Text as T
import qualified Data.ByteString as B
import qualified Data.ByteString.Lazy as LB
import Network.Wai (Application, pathInfo)
import Web.Scotty
......@@ -22,7 +24,7 @@ import qualified Data.HashMap.Strict as M
import RD.CliVersion (cliVersion)
import RD.Types
import RD.Server.Config
import RD.Lib (sha1sum, genBlocks)
import RD.Lib (sha1sum, sha1sumOnBytes, genBlocks)
import RD.Utils
import qualified RD.Server.DB as DB
......@@ -43,19 +45,77 @@ fillSha1sum rc fbp = do
fillBlock :: Block -> BlockWithChecksum
fillBlock (blockId, start, end) = (blockId, start, end, decodeUtf8 $ M.lookupDefault "pending" (blockIdKey blockId) blockIdSha1sumMap)
-- | a sha1sum that has different types, used for storing sha1sum in redis.
-- only used in isNewFile.
sha1sumByteString :: FilePath -> IO B.ByteString
sha1sumByteString filename = do
bytes <- LB.readFile filename
return $ B.toStrict $ sha1sumOnBytes bytes
-- | try set file's working status to FileStatusWorking if it has not been set before.
trySetFileToWorkingStatus :: RDRuntimeConfig -> B.ByteString -> IO (Either T.Text Bool)
trySetFileToWorkingStatus rc strKey = do
DB.insertIfNotExist rc strKey $ fsBytes FileStatusWorking
-- | return True if given file and block size is a new combination. if file
-- content has changed, it is always considered a new file, old cache will be
-- purged. when this function return Right True, it will set working flag in
-- redis for the file name and block size combination.
isNewFile :: RDRuntimeConfig -> FillBlockParam -> IO (Either T.Text Bool)
isNewFile rc fbp = do
let strKey = fileStatusKey fbp
fn = T.pack $ fbpFilepath fbp
-- units -o %15f --terse 100MiB byte
if fbpFileSize fbp < 104857600 then do
cachedSha1E <- DB.get rc (fileSha1Key fbp)
case cachedSha1E of
Left msg -> return $ Left msg
Right sha1Maybe -> do
currentSha1 <- sha1sumByteString $ fbpFilepath fbp
case sha1Maybe of
Just cachedSha1 -> do
if currentSha1 /= cachedSha1
then do
-- set progress to working
warnl rc $ "file sha1 changed, will recalculate block hashes: " <> fn
let hashKey = blockSha1sumHashKey fbp
redisReply <- R.runRedis (rcRedisConn rc) $ R.del [hashKey]
case redisReply of
Left reply -> do
errorl rc $ "invalidate blocks old cache key failed:\n\t" <> showt reply
return $ Left "invalidate blocks old cache key failed"
Right _ -> do
infol rc $ "invalidated blocks cache for " <> fn
_ <- DB.set rc (fileSha1Key fbp) currentSha1
resultE <- DB.set rc strKey $ fsBytes FileStatusWorking
case resultE of
Left _ -> return $ Left "set file status to working failed"
Right _ -> return $ Right True
else do
debugl rc $ "file sha1 not changed: " <> fn
trySetFileToWorkingStatus rc strKey
Nothing -> do
resultE <- DB.set rc (fileSha1Key fbp) currentSha1
case resultE of
Left msg -> return $ Left msg
Right _ -> trySetFileToWorkingStatus rc strKey
else do
debugl rc "file sha1 is not checked for large files"
trySetFileToWorkingStatus rc strKey
-- | given a FillBlockParam, if this file is new, send job to worker and mark
-- it as working. if there is an error, return IO Left.
processNewFileAsyncMaybe :: RDRuntimeConfig -> FillBlockParam -> ExceptT T.Text IO ()
processNewFileAsyncMaybe rc fbp = do
let strKey = fileStatusKey fbp
filePath = fbpFilepath fbp
resultE <- liftIO $ DB.insertIfNotExist rc strKey $ fsBytes FileStatusWorking
resultE <- liftIO $ isNewFile rc fbp
throwOnLeft resultE
let insertOk = fromRight' resultE
if insertOk then liftIO $ do
infol rc $ T.pack filePath <> " is a new file, sending task to worker"
writeChan (rcFileChan rc) fbp
return ()
infol rc $ T.pack filePath <> " is a new file, sending task to worker"
writeChan (rcFileChan rc) fbp
return ()
else do
oldStatusE <- liftIO $ do
debugl rc $ T.pack filePath <> " is not a new file"
......
......@@ -35,6 +35,7 @@ parseIntEnv key env =
[(i, [])] -> Right $ Just i
_ -> Left $ "failed to parse " <> T.pack key <> " from env variable: " <> T.pack s
{-# ANN updateRDConfigFromEnvPure ("HLint: ignore Use =<<" :: String) #-}
updateRDConfigFromEnvPure :: [(String, String)] -> RDConfig -> Either T.Text RDConfig
updateRDConfigFromEnvPure env c0 =
(\c -> Right $ maybe c (\h -> c { host=h }) (lookup "HOST" env)) c0 >>=
......
......@@ -94,3 +94,7 @@ blockIdKey = Char8.pack . show
-- new|working|done.
fileStatusKey :: FillBlockParam -> B.ByteString
fileStatusKey fbp = blockSha1sumHashKey fbp <> "_status"
-- | the redis string key used to store this file's sha1sum
fileSha1Key :: FillBlockParam -> B.ByteString
fileSha1Key fbp = encodeUtf8 $ T.pack (fbpFilepath fbp) <> "_sha1"
......@@ -3,7 +3,9 @@ module RD.Server.Worker (startWorkers, sha1sumFileRange, fileRange) where
import Control.Concurrent.Chan
import System.IO (IOMode(ReadMode), withBinaryFile)
import GHC.IO.Handle (Handle, hTell, hSeek, SeekMode(AbsoluteSeek))
import Control.Monad (when, replicateM_, forever)
import Control.Monad (when, forever)
import Data.Foldable (forM_)
import Control.Concurrent (forkIO)
import qualified Data.ByteString as B
import qualified Data.ByteString.Lazy as LB
......@@ -38,14 +40,14 @@ sha1sumFileRange filepath start end = sha1sumOnBytes <$> fileRange filepath star
-- | a file worker fetch FillBlockParam from fileChan, then calculate sha1sum
-- for all blocks and write result to redis. then mark the file as done.
fileWorker :: RDRuntimeConfig -> IO ()
fileWorker rc = forever $ do
infol rc ("fileWorker is waiting for jobs..." :: T.Text)
fileWorker :: RDRuntimeConfig -> T.Text -> IO ()
fileWorker rc fileWorkerName = forever $ do
infol rc $ fileWorkerName <> " is waiting for jobs..."
fbp <- readChan (rcFileChan rc)
let filepath = fbpFilepath fbp
conn = rcRedisConn rc
-- calculate sha1sum for each block and write result to redis hash
infol rc $ "fileWorker working on " <> T.pack filepath
infol rc $ fileWorkerName <> " working on " <> T.pack filepath
results <- withBinaryFile filepath ReadMode $ \handle -> do
let hashKey = blockSha1sumHashKey fbp
mapM (calculateSha1ForBlock conn hashKey handle) (fbpBlocks fbp)
......@@ -57,8 +59,9 @@ fileWorker rc = forever $ do
Right _ -> do
debugl rc $ "Set file status to " <> showt resultStatus <> " for " <> T.pack filepath
infol rc $ sformat
("fileWorker done for " % stext % ", " % stext % ", " % int % " blocks")
(T.pack filepath) (humanReadableSize (fbpFileSize fbp)) (length (fbpBlocks fbp))
(stext % " done for " % stext % ", " % stext % ", " % int % " blocks")
fileWorkerName (T.pack filepath) (humanReadableSize (fbpFileSize fbp))
(length (fbpBlocks fbp))
return ()
where
-- | calculate sha1 for a single block. return IO True on success.
......@@ -94,4 +97,5 @@ startWorkers :: RDRuntimeConfig -> IO ()
startWorkers rc = do
let workerCount = fileWorkerCount $ rcConfig rc
infol rc $ "creating " <> showt workerCount <> " file worker(s)"
replicateM_ workerCount (forkIO $ fileWorker rc)
forM_ [1..workerCount]
(\n -> forkIO $ fileWorker rc ("fileWorker-" <> showt n))
......@@ -82,22 +82,31 @@ showProgressAllDone rc = do
("All urls downloaded. " % int % " files, " % int % " blocks.")
totalfilec totalblockc
-- | show progress if at least one new block is fetched since last time. Otherwise, give a hint there may be a DL hang.
-- | show progress if at least one new block is fetched since last
-- time. Otherwise, give a hint there may be a DL hang. when all blocks are
-- fetched, return -1
showProgressMaybe :: RDClientRuntimeConfig -> Int -> IO Int
showProgressMaybe rc lastDownloadedBlockCount = do
p <- readMVar (rdProgress rc)
let newDLBC = piDownloadedBlockCount p
if newDLBC > lastDownloadedBlockCount then do
if newDLBC > lastDownloadedBlockCount
then do
showProgress1 rc p
return newDLBC
else do
warnl rc $ sformat ("No block fetched in last " % int % " seconds")
(progressInterval (rdOptions rc))
return lastDownloadedBlockCount
else
if piDownloadedBlockCount p < piTotalBlockCount p
then do
warnl rc $ sformat ("No block fetched in last " % int % " seconds")
(progressInterval (rdOptions rc))
return lastDownloadedBlockCount
else
return (-1)
-- | show download progress in console. designed to run in a thread.
showProgressLoop :: RDClientRuntimeConfig -> Int -> IO ()
showProgressLoop rc lastDownloadedBlockCount = do
threadDelay (progressInterval (rdOptions rc) * 1000000)
newCount <- showProgressMaybe rc lastDownloadedBlockCount
showProgressLoop rc newCount
case newCount of
-1 -> return ()
_ -> showProgressLoop rc newCount
......@@ -85,7 +85,7 @@ spec = do
describe "humanReadableSize" $ do
it "should work" $ do
humanReadableSize 123 `shouldBe` "0.0 MiB"
humanReadableSize 123 `shouldBe` "<1.0 MiB"
humanReadableSize 1048576 `shouldBe` "1.0 MiB"
humanReadableSize 1048579 `shouldBe` "1.0 MiB"
humanReadableSize (1048576 * 2) `shouldBe` "2.0 MiB"
......