From 15109819df0307a85eb2841d981512b4bae82be2 Mon Sep 17 00:00:00 2001 From: Sam Date: Wed, 15 Mar 2023 15:24:51 +0100 Subject: [PATCH] feat(backend): add data export --- backend/db/avatars.go | 18 +- backend/db/export.go | 104 +++++++++++ backend/db/member.go | 2 +- backend/exporter/exporter.go | 265 +++++++++++++++++++++++++++ backend/exporter/types.go | 64 +++++++ backend/log/log.go | 2 +- backend/routes/auth/export.go | 74 ++++++++ backend/routes/auth/routes.go | 5 + backend/server/errors.go | 3 + frontend/src/lib/api/entities.ts | 2 + main.go | 2 + scripts/cleandb/main.go | 20 ++ scripts/migrate/008_data_exports.sql | 2 +- 13 files changed, 559 insertions(+), 4 deletions(-) create mode 100644 backend/db/export.go create mode 100644 backend/exporter/exporter.go create mode 100644 backend/exporter/types.go create mode 100644 backend/routes/auth/export.go diff --git a/backend/db/avatars.go b/backend/db/avatars.go index 338e743..ae4bd9c 100644 --- a/backend/db/avatars.go +++ b/backend/db/avatars.go @@ -221,10 +221,26 @@ func (db *DB) DeleteMemberAvatar(ctx context.Context, memberID xid.ID, hash stri return errors.Wrap(err, "deleting webp avatar") } - err = db.minio.RemoveObject(ctx, db.minioBucket, "/members/"+memberID.String()+"/"+hash+".webp", minio.RemoveObjectOptions{}) + err = db.minio.RemoveObject(ctx, db.minioBucket, "/members/"+memberID.String()+"/"+hash+".jpg", minio.RemoveObjectOptions{}) if err != nil { return errors.Wrap(err, "deleting jpeg avatar") } return nil } + +func (db *DB) UserAvatar(ctx context.Context, userID xid.ID, hash string) (io.ReadCloser, error) { + obj, err := db.minio.GetObject(ctx, db.minioBucket, "/users/"+userID.String()+"/"+hash+".webp", minio.GetObjectOptions{}) + if err != nil { + return nil, errors.Wrap(err, "getting object") + } + return obj, nil +} + +func (db *DB) MemberAvatar(ctx context.Context, memberID xid.ID, hash string) (io.ReadCloser, error) { + obj, err := db.minio.GetObject(ctx, db.minioBucket, "/members/"+memberID.String()+"/"+hash+".webp", minio.GetObjectOptions{}) + if err != nil { + return nil, errors.Wrap(err, "getting object") + } + return obj, nil +} diff --git a/backend/db/export.go b/backend/db/export.go new file mode 100644 index 0000000..68f8e23 --- /dev/null +++ b/backend/db/export.go @@ -0,0 +1,104 @@ +package db + +import ( + "bytes" + "context" + "time" + + "emperror.dev/errors" + "github.com/georgysavva/scany/pgxscan" + "github.com/jackc/pgx/v4" + "github.com/minio/minio-go/v7" + "github.com/rs/xid" +) + +type DataExport struct { + ID int64 + UserID xid.ID + Filename string + CreatedAt time.Time +} + +func (de DataExport) Path() string { + return "/exports/" + de.UserID.String() + "/" + de.Filename + ".zip" +} + +const ErrNoExport = errors.Sentinel("no data export exists") + +const KeepExportTime = 7 * 24 * time.Hour + +func (db *DB) UserExport(ctx context.Context, userID xid.ID) (de DataExport, err error) { + sql, args, err := sq.Select("*"). + From("data_exports"). + Where("user_id = ?", userID). + OrderBy("id DESC"). + Limit(1).ToSql() + if err != nil { + return de, errors.Wrap(err, "building query") + } + + err = pgxscan.Get(ctx, db, &de, sql, args...) + if err != nil { + if errors.Cause(err) == pgx.ErrNoRows { + return de, ErrNoExport + } + + return de, errors.Wrap(err, "executing sql") + } + return de, nil +} + +const recentExport = 24 * time.Hour + +func (db *DB) HasRecentExport(ctx context.Context, userID xid.ID) (hasExport bool, err error) { + err = db.QueryRow(ctx, + "SELECT EXISTS(SELECT * FROM data_exports WHERE user_id = $1 AND created_at > $2)", + userID, time.Now().Add(-recentExport)).Scan(&hasExport) + if err != nil { + return false, errors.Wrap(err, "executing query") + } + return hasExport, nil +} + +func (db *DB) CreateExport(ctx context.Context, userID xid.ID, filename string, file *bytes.Buffer) (de DataExport, err error) { + de = DataExport{ + UserID: userID, + Filename: filename, + } + + _, err = db.minio.PutObject(ctx, db.minioBucket, de.Path(), file, int64(file.Len()), minio.PutObjectOptions{ + ContentType: "application/zip", + }) + if err != nil { + return de, errors.Wrap(err, "writing export file") + } + + sql, args, err := sq.Insert("data_exports").Columns("user_id", "filename").Values(userID, filename).ToSql() + if err != nil { + return de, errors.Wrap(err, "building query") + } + + pgxscan.Get(ctx, db, &de, sql, args...) + if err != nil { + return de, errors.Wrap(err, "executing sql") + } + return de, nil +} + +func (db *DB) DeleteExport(ctx context.Context, de DataExport) (err error) { + sql, args, err := sq.Delete("data_exports").Where("id = ?", de.ID).ToSql() + if err != nil { + return errors.Wrap(err, "building query") + } + + err = db.minio.RemoveObject(ctx, db.minioBucket, de.Path(), minio.RemoveObjectOptions{}) + if err != nil { + return errors.Wrap(err, "deleting export zip") + } + + _, err = db.Exec(ctx, sql, args...) + if err != nil { + return errors.Wrap(err, "executing sql") + } + return nil +} diff --git a/backend/db/member.go b/backend/db/member.go index f8c241a..cbf5fc1 100644 --- a/backend/db/member.go +++ b/backend/db/member.go @@ -61,7 +61,7 @@ func (db *DB) UserMember(ctx context.Context, userID xid.ID, memberRef string) ( // UserMembers returns all of a user's members, sorted by name. func (db *DB) UserMembers(ctx context.Context, userID xid.ID) (ms []Member, err error) { - sql, args, err := sq.Select("id", "user_id", "name", "display_name", "bio", "avatar", "names", "pronouns"). + sql, args, err := sq.Select("*"). From("members").Where("user_id = ?", userID). OrderBy("name", "id").ToSql() if err != nil { diff --git a/backend/exporter/exporter.go b/backend/exporter/exporter.go new file mode 100644 index 0000000..f475edc --- /dev/null +++ b/backend/exporter/exporter.go @@ -0,0 +1,265 @@ +package exporter + +import ( + "archive/zip" + "bytes" + "context" + "crypto/rand" + "encoding/base64" + "encoding/json" + "io" + "net/http" + "os" + "os/signal" + "sync" + + "codeberg.org/u1f320/pronouns.cc/backend/db" + "codeberg.org/u1f320/pronouns.cc/backend/log" + "github.com/go-chi/chi/v5" + "github.com/go-chi/chi/v5/middleware" + "github.com/rs/xid" + "github.com/urfave/cli/v2" +) + +var Command = &cli.Command{ + Name: "exporter", + Usage: "Data exporter service", + Action: run, +} + +type server struct { + Router chi.Router + DB *db.DB + + exporting map[xid.ID]struct{} + exportingMu sync.Mutex +} + +func run(c *cli.Context) error { + port := ":" + os.Getenv("EXPORTER_PORT") + + db, err := db.New() + if err != nil { + log.Fatalf("creating database: %v", err) + return err + } + + s := &server{ + Router: chi.NewRouter(), + DB: db, + exporting: make(map[xid.ID]struct{}), + } + + // set up middleware + the single route + s.Router.Use(middleware.Recoverer) + s.Router.Get("/start/{id}", s.startExport) + + e := make(chan error) + + // run server in another goroutine (for gracefully shutting down, see below) + go func() { + e <- http.ListenAndServe(port, s.Router) + }() + + ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt) + defer stop() + + log.Infof("API server running at %v!", port) + + select { + case <-ctx.Done(): + log.Info("Interrupt signal received, shutting down...") + s.DB.Close() + return nil + case err := <-e: + log.Fatalf("Error running server: %v", err) + } + + return nil +} + +func (s *server) startExport(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + id, err := xid.FromString(chi.URLParam(r, "id")) + if err != nil { + w.WriteHeader(http.StatusBadRequest) + return + } + + u, err := s.DB.User(ctx, id) + if err != nil { + log.Errorf("getting user %v: %v", id, err) + w.WriteHeader(http.StatusInternalServerError) + return + } + + go s.doExport(u) + + w.WriteHeader(http.StatusAccepted) +} + +func (s *server) doExport(u db.User) { + s.exportingMu.Lock() + if _, ok := s.exporting[u.ID]; ok { + s.exportingMu.Unlock() + log.Debugf("user %v is already being exported, aborting", u.ID) + return + } + s.exporting[u.ID] = struct{}{} + s.exportingMu.Unlock() + + defer func() { + s.exportingMu.Lock() + delete(s.exporting, u.ID) + s.exportingMu.Unlock() + }() + + ctx := context.Background() + + log.Debugf("[%v] starting export of user", u.ID) + + outBuffer := new(bytes.Buffer) + zw := zip.NewWriter(outBuffer) + defer zw.Close() + + w, err := zw.Create("user.json") + if err != nil { + log.Errorf("[%v] creating file in zip archive: %v", u.ID, err) + return + } + + log.Debugf("[%v] getting user fields", u.ID) + + fields, err := s.DB.UserFields(ctx, u.ID) + if err != nil { + log.Errorf("[%v] getting user fields: %v", u.ID, err) + return + } + + log.Debugf("[%v] writing user json", u.ID) + + ub, err := json.Marshal(dbUserToExport(u, fields)) + if err != nil { + log.Errorf("[%v] marshaling user: %v", u.ID, err) + return + } + + _, err = w.Write(ub) + if err != nil { + log.Errorf("[%v] writing user: %v", u.ID, err) + return + } + + if u.Avatar != nil { + log.Debugf("[%v] getting user avatar", u.ID) + + w, err := zw.Create("user_avatar.webp") + if err != nil { + log.Errorf("[%v] creating file in zip archive: %v", u.ID, err) + return + } + + r, err := s.DB.UserAvatar(ctx, u.ID, *u.Avatar) + if err != nil { + log.Errorf("[%v] getting user avatar: %v", u.ID, err) + return + } + defer r.Close() + + _, err = io.Copy(w, r) + if err != nil { + log.Errorf("[%v] writing user avatar: %v", u.ID, err) + return + } + + log.Debugf("[%v] exported user avatar", u.ID) + } + + members, err := s.DB.UserMembers(ctx, u.ID) + if err != nil { + log.Errorf("[%v] getting user members: %v", u.ID, err) + return + } + + for _, m := range members { + log.Debugf("[%v] starting export for member %v", u.ID, m.ID) + + fields, err := s.DB.MemberFields(ctx, m.ID) + if err != nil { + log.Errorf("[%v] getting fields for member %v: %v", u.ID, m.ID, err) + return + } + + w, err := zw.Create("members/" + m.Name + "-" + m.ID.String() + ".json") + if err != nil { + log.Errorf("[%v] creating file in zip archive: %v", u.ID, err) + return + } + + mb, err := json.Marshal(dbMemberToExport(m, fields)) + if err != nil { + log.Errorf("[%v] marshaling member %v: %v", u.ID, m.ID, err) + return + } + + _, err = w.Write(mb) + if err != nil { + log.Errorf("[%v] writing member %v json: %v", u.ID, m.ID, err) + return + } + + if m.Avatar != nil { + log.Debugf("[%v] getting member %v avatar", u.ID, m.ID) + + w, err := zw.Create("members/" + m.Name + "-" + m.ID.String() + "-avatar.webp") + if err != nil { + log.Errorf("[%v] creating file in zip archive: %v", u.ID, err) + return + } + + r, err := s.DB.MemberAvatar(ctx, m.ID, *m.Avatar) + if err != nil { + log.Errorf("[%v] getting member %v avatar: %v", u.ID, m.ID, err) + return + } + defer r.Close() + + _, err = io.Copy(w, r) + if err != nil { + log.Errorf("[%v] writing member %v avatar: %v", u.ID, m.ID, err) + return + } + + log.Debugf("[%v] exported member %v avatar", u.ID, m.ID) + } + + log.Debugf("[%v] finished export for member %v", u.ID, m.ID) + } + + log.Debugf("[%v] finished export, uploading to object storage and saving in database", u.ID) + + err = zw.Close() + if err != nil { + log.Errorf("[%v] closing zip file: %v", u.ID, err) + return + } + + de, err := s.DB.CreateExport(ctx, u.ID, randomFilename(), outBuffer) + if err != nil { + log.Errorf("[%v] writing export: %v", u.ID, err) + return + } + + log.Debugf("[%v] finished writing export. path: %q", u.ID, de.Path()) +} + +func randomFilename() string { + b := make([]byte, 32) + + _, err := rand.Read(b) + if err != nil { + panic(err) + } + + return base64.RawURLEncoding.EncodeToString(b) +} diff --git a/backend/exporter/types.go b/backend/exporter/types.go new file mode 100644 index 0000000..f0b5165 --- /dev/null +++ b/backend/exporter/types.go @@ -0,0 +1,64 @@ +package exporter + +import ( + "codeberg.org/u1f320/pronouns.cc/backend/db" + "github.com/rs/xid" +) + +type userExport struct { + ID xid.ID `json:"id"` + Username string `json:"name"` + DisplayName *string `json:"display_name"` + Bio *string `json:"bio"` + + Links []string `json:"links"` + + Names []db.FieldEntry `json:"names"` + Pronouns []db.PronounEntry `json:"pronouns"` + Fields []db.Field `json:"fields"` + + Discord *string `json:"discord"` + DiscordUsername *string `json:"discord_username"` + + MaxInvites int `json:"max_invites"` +} + +func dbUserToExport(u db.User, fields []db.Field) userExport { + return userExport{ + ID: u.ID, + Username: u.Username, + DisplayName: u.DisplayName, + Bio: u.Bio, + Links: u.Links, + Names: u.Names, + Pronouns: u.Pronouns, + Fields: fields, + Discord: u.Discord, + DiscordUsername: u.DiscordUsername, + MaxInvites: u.MaxInvites, + } +} + +type memberExport struct { + ID xid.ID `json:"id"` + Name string `json:"name"` + DisplayName *string `json:"display_name"` + Bio *string `json:"bio"` + Links []string `json:"links"` + Names []db.FieldEntry `json:"names"` + Pronouns []db.PronounEntry `json:"pronouns"` + Fields []db.Field `json:"fields"` +} + +func dbMemberToExport(m db.Member, fields []db.Field) memberExport { + return memberExport{ + ID: m.ID, + Name: m.Name, + DisplayName: m.DisplayName, + Bio: m.Bio, + Links: m.Links, + Names: m.Names, + Pronouns: m.Pronouns, + Fields: fields, + } +} diff --git a/backend/log/log.go b/backend/log/log.go index 78d6e61..5727ace 100644 --- a/backend/log/log.go +++ b/backend/log/log.go @@ -12,7 +12,7 @@ var SugaredLogger *zap.SugaredLogger func init() { zcfg := zap.NewProductionConfig() - zcfg.Level.SetLevel(zap.InfoLevel) + zcfg.Level.SetLevel(zap.DebugLevel) zcfg.Encoding = "console" zcfg.EncoderConfig.EncodeLevel = zapcore.CapitalColorLevelEncoder zcfg.EncoderConfig.EncodeTime = zapcore.ISO8601TimeEncoder diff --git a/backend/routes/auth/export.go b/backend/routes/auth/export.go new file mode 100644 index 0000000..7454eef --- /dev/null +++ b/backend/routes/auth/export.go @@ -0,0 +1,74 @@ +package auth + +import ( + "net/http" + "time" + + "codeberg.org/u1f320/pronouns.cc/backend/db" + "codeberg.org/u1f320/pronouns.cc/backend/log" + "codeberg.org/u1f320/pronouns.cc/backend/server" + "github.com/go-chi/render" +) + +func (s *Server) startExport(w http.ResponseWriter, r *http.Request) error { + ctx := r.Context() + claims, _ := server.ClaimsFromContext(ctx) + + hasExport, err := s.DB.HasRecentExport(ctx, claims.UserID) + if err != nil { + log.Errorf("checking if user has recent export: %v", err) + return server.APIError{Code: server.ErrInternalServerError} + } + if hasExport { + return server.APIError{Code: server.ErrRecentExport} + } + + req, err := http.NewRequestWithContext(ctx, "GET", s.ExporterPath+"/start/"+claims.UserID.String(), nil) + if err != nil { + log.Errorf("creating start export request: %v", err) + return server.APIError{Code: server.ErrInternalServerError} + } + + resp, err := http.DefaultClient.Do(req) + if err != nil { + log.Errorf("executing start export request: %v", err) + return server.APIError{Code: server.ErrInternalServerError} + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusAccepted { + log.Errorf("got non-%v code: %v", http.StatusAccepted, resp.StatusCode) + return server.APIError{ + Code: server.ErrInternalServerError, + } + } + + render.JSON(w, r, map[string]any{"started": true}) + return nil +} + +type dataExportResponse struct { + Path string `json:"path"` + CreatedAt time.Time `json:"created_at"` +} + +func (s *Server) getExport(w http.ResponseWriter, r *http.Request) error { + ctx := r.Context() + claims, _ := server.ClaimsFromContext(ctx) + + de, err := s.DB.UserExport(ctx, claims.UserID) + if err != nil { + if err == db.ErrNoExport { + return server.APIError{Code: server.ErrNotFound} + } + + log.Errorf("getting export for user %v: %v", claims.UserID, err) + return err + } + + render.JSON(w, r, dataExportResponse{ + Path: de.Path(), + CreatedAt: de.CreatedAt, + }) + return nil +} diff --git a/backend/routes/auth/routes.go b/backend/routes/auth/routes.go index e19fcf2..39d1a61 100644 --- a/backend/routes/auth/routes.go +++ b/backend/routes/auth/routes.go @@ -17,6 +17,7 @@ type Server struct { *server.Server RequireInvite bool + ExporterPath string } type userResponse struct { @@ -54,6 +55,7 @@ func Mount(srv *server.Server, r chi.Router) { s := &Server{ Server: srv, RequireInvite: os.Getenv("REQUIRE_INVITE") == "true", + ExporterPath: "http://127.0.0.1:" + os.Getenv("EXPORTER_PORT"), } r.Route("/auth", func(r chi.Router) { @@ -79,6 +81,9 @@ func Mount(srv *server.Server, r chi.Router) { r.With(server.MustAuth).Post("/tokens", server.WrapHandler(s.createToken)) r.With(server.MustAuth).Delete("/tokens/{id}", server.WrapHandler(s.deleteToken)) + r.With(server.MustAuth).Get("/export/start", server.WrapHandler(s.startExport)) + r.With(server.MustAuth).Get("/export", server.WrapHandler(s.getExport)) + // cancel user delete // uses a special token, so handled in the function itself r.Get("/cancel-delete", server.WrapHandler(s.cancelDelete)) diff --git a/backend/server/errors.go b/backend/server/errors.go index ae4cf23..bbe35d2 100644 --- a/backend/server/errors.go +++ b/backend/server/errors.go @@ -92,6 +92,7 @@ const ( ErrInviteLimitReached = 1009 // invite limit reached (when creating invites) ErrInviteAlreadyUsed = 1010 // invite already used (when signing up) ErrDeletionPending = 1011 // own user deletion pending, returned with undo code + ErrRecentExport = 1012 // latest export is too recent // User-related error codes ErrUserNotFound = 2001 @@ -126,6 +127,7 @@ var errCodeMessages = map[int]string{ ErrInviteLimitReached: "Your account has reached the invite limit", ErrInviteAlreadyUsed: "That invite code has already been used", ErrDeletionPending: "Your account is pending deletion", + ErrRecentExport: "Your latest data export is less than 1 day old", ErrUserNotFound: "User not found", @@ -157,6 +159,7 @@ var errCodeStatuses = map[int]int{ ErrInviteLimitReached: http.StatusForbidden, ErrInviteAlreadyUsed: http.StatusBadRequest, ErrDeletionPending: http.StatusBadRequest, + ErrRecentExport: http.StatusBadRequest, ErrUserNotFound: http.StatusNotFound, diff --git a/frontend/src/lib/api/entities.ts b/frontend/src/lib/api/entities.ts index 66861dc..0f7ad2d 100644 --- a/frontend/src/lib/api/entities.ts +++ b/frontend/src/lib/api/entities.ts @@ -102,6 +102,7 @@ export enum ErrorCode { InvitesDisabled = 1008, InviteLimitReached = 1009, InviteAlreadyUsed = 1010, + RecentExport = 1012, UserNotFound = 2001, @@ -109,6 +110,7 @@ export enum ErrorCode { MemberLimitReached = 3002, RequestTooBig = 4001, + MissingPermissions = 4002, } export const pronounDisplay = (entry: Pronoun) => { diff --git a/main.go b/main.go index eb78f8a..40b4534 100644 --- a/main.go +++ b/main.go @@ -5,6 +5,7 @@ import ( "os" "codeberg.org/u1f320/pronouns.cc/backend" + "codeberg.org/u1f320/pronouns.cc/backend/exporter" "codeberg.org/u1f320/pronouns.cc/backend/server" "codeberg.org/u1f320/pronouns.cc/scripts/cleandb" "codeberg.org/u1f320/pronouns.cc/scripts/migrate" @@ -18,6 +19,7 @@ var app = &cli.App{ Version: server.Tag, Commands: []*cli.Command{ backend.Command, + exporter.Command, { Name: "database", Aliases: []string{"db"}, diff --git a/scripts/cleandb/main.go b/scripts/cleandb/main.go index dc1c001..2741ed4 100644 --- a/scripts/cleandb/main.go +++ b/scripts/cleandb/main.go @@ -46,6 +46,26 @@ func run(c *cli.Context) error { fmt.Printf("deleted %v invalidated or expired tokens\n", ct.RowsAffected()) + fmt.Println("deleting expired export files") + var exports []dbpkg.DataExport + err = pgxscan.Select(ctx, db, &exports, "SELECT * FROM data_exports WHERE created_at < $1", time.Now().Add(-dbpkg.KeepExportTime)) + if err != nil { + fmt.Println("error getting to-be-deleted export files:", err) + return err + } + + for _, de := range exports { + err = db.DeleteExport(ctx, de) + if err != nil { + fmt.Printf("error deleting export %v: %v\n", de.ID, err) + continue + } + + fmt.Println("deleted export", de.ID) + } + + fmt.Printf("deleted %v expired exports\n", len(exports)) + var users []dbpkg.User err = pgxscan.Select(ctx, db, &users, `SELECT * FROM users WHERE deleted_at IS NOT NULL AND diff --git a/scripts/migrate/008_data_exports.sql b/scripts/migrate/008_data_exports.sql index af11a5e..937a56c 100644 --- a/scripts/migrate/008_data_exports.sql +++ b/scripts/migrate/008_data_exports.sql @@ -5,6 +5,6 @@ create table data_exports ( id serial primary key, user_id text not null references users (id) on delete cascade, - hash text not null, + filename text not null, created_at timestamptz not null default now() );