1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
|
func Gunzip(sourceBucket, destinationBucket, key string) {
// create pipe
reader, writer := io.Pipe()
sess, _ := session.NewSession(&aws.Config{
Region: aws.String(region)},
)
// create downloader to download file from source bucket
downloader := s3manager.NewDownloader(sess)
// wait for downloader and uploader
wg := sync.WaitGroup{}
wg.Add(2)
// run downloader
go func() {
defer func() {
// it is important to close the writer or reading
// from the other end of the pipe will never finish
wg.Done()
writer.Close()
}()
numBytes, err := downloader.Download(FakeWriterAt{writer},
&s3.GetObjectInput{
Bucket: aws.String(sourceBucket),
Key: aws.String(key),
})
if err != nil {
exitErrorf("Unable to download item %q, %v", key, err)
}
log.Printf("Downloaded %d bytes", numBytes)
}()
// run uploader
go func() {
defer wg.Done()
gzReader, _ := gzip.NewReader(reader)
uploader := s3manager.NewUploader(sess)
metadata := make(map[string]*string)
metadata["Content-Type"] = aws.String("text/plain")
result, err := uploader.Upload(&s3manager.UploadInput{
Body: gzReader,
Bucket: aws.String(destinationBucket),
Key: aws.String(strings.ReplaceAll(key, ".gz", "")),
Metadata: metadata,
})
if err != nil {
log.Fatalln("Failed to upload", err)
}
log.Println("Successfully uploaded to", result.Location)
}()
wg.Wait()
}
|