作者:牛奶香橙 | 来源:互联网 | 2023-09-24 19:10
我正在尝试使用管道重新实现此功能(不是因为比起'streaming'而不是'pipes'而是为了减少项目中使用的流式传输库的数量):
import qualified Data.ByteString.Streaming as Q
import Data.ByteString.Streaming.HTTP
downloadToFile :: MonadIO m => URI ->
(SomeException -> err) ->
Handle ->
m (Either err Hash)
downloadToFile (URI uri) err destinatiOnHandle=
liftIO $ first err <$> try go
where
go = do
req <- parseRequest $ T.unpack uri
tls <- newManager tlsManagerSettings
hash <- newIORef S256.init
withHTTP req tls $ \resp ->
Q.hPut destinationHandle $
Q.chunkMapM (\chunk ->
modifyIORef' hash (`S256.update` chunk) >> pure chunk) $
responseBody resp
hh <- readIORef hash
pure $ Hash $ S256.finalize hh
它将HTTP流写入文件(destinationHandle
),并以流方式计算SHA256。我开始在管道和管道-http中实现它,而我的第一个想法就是按照以下方式创建一些东西:
import Pipes
import Pipes.HTTP
import qualified Pipes.ByteString as PB
import qualified Pipes.Prelude as P
downloadToFile :: MonadIO m => URI ->
(SomeException -> err) ->
Handle ->
m (Either err Hash)
downloadToFile (URI uri) err destinatiOnHandle=
liftIO $ first err <$> try go
where
go = do
req <- parseRequest $ T.unpack uri
tls <- newManager tlsManagerSettings
hash <- newIORef S256.init
withHTTP req tls $ \resp ->
runEffect $ responseBody resp >->
P.tee (PB.toHandle destinationHandle) >->
hashConsumer
hashCOnsumer= flip evalstatet S256.init $ do
chunk <- lift $ await
modify' $ `S256.update` chunk
-- what now???
但是我不明白如何确定流何时结束以及应该在哪里调用S256.finalize。我相信,在管道await
中返回Maybe a
,它通常用于指示流的结束,但是我不确定如何将其转换为惯用管道代码。
我认为这是两个问题:
- 如何完成一些有状态的
Consumer
- 或如何折叠流并获取折叠值,以便有人可以
为我指出一个很好的例子,那太好了。