diff --git a/go.mod b/go.mod index ac74f29..2cd4f3c 100644 --- a/go.mod +++ b/go.mod @@ -90,6 +90,7 @@ require ( github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect github.com/golang/protobuf v1.5.3 // indirect + github.com/golang/snappy v0.0.4 // indirect github.com/google/flatbuffers v23.5.26+incompatible // indirect github.com/google/s2a-go v0.1.7 // indirect github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 // indirect @@ -117,6 +118,7 @@ require ( github.com/mattn/go-isatty v0.0.20 // indirect github.com/mattn/go-sqlite3 v1.14.22 // indirect github.com/moby/term v0.5.0 // indirect + github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe // indirect github.com/opencontainers/go-digest v1.0.0 // indirect github.com/opencontainers/image-spec v1.1.0 // indirect github.com/opencontainers/runc v1.1.12 // indirect @@ -131,10 +133,15 @@ require ( github.com/stretchr/testify v1.9.0 // indirect github.com/tidwall/match v1.1.1 // indirect github.com/tidwall/pretty v1.2.1 // indirect + github.com/xdg-go/pbkdf2 v1.0.0 // indirect + github.com/xdg-go/scram v1.1.2 // indirect + github.com/xdg-go/stringprep v1.0.4 // indirect github.com/xeipuuv/gojsonpointer v0.0.0-20190905194746-02993c407bfb // indirect github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415 // indirect github.com/xeipuuv/gojsonschema v1.2.0 // indirect + github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d // indirect github.com/zeebo/xxh3 v1.0.2 // indirect + go.mongodb.org/mongo-driver v1.14.0 // indirect go.opencensus.io v0.24.0 // indirect go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.49.0 // indirect go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0 // indirect diff --git a/go.sum b/go.sum index 5182aa3..046004c 100644 --- a/go.sum +++ b/go.sum @@ -173,6 +173,8 @@ github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiu github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= +github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/flatbuffers v23.5.26+incompatible h1:M9dgRyhJemaM4Sw8+66GHBu8ioaQmyPLg1b8VwK5WJg= github.com/google/flatbuffers v23.5.26+incompatible/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= @@ -276,6 +278,7 @@ github.com/moby/term v0.5.0 h1:xt8Q1nalod/v7BqbG21f8mQPqH+xAaC9C3N3wfWbVP0= github.com/moby/term v0.5.0/go.mod h1:8FzsFHVUBGZdbDsJw/ot+X+d5HLUbvklYLJ9uGfcI3Y= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= +github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe h1:iruDEfMl2E6fbMZ9s0scYfZQ84/6SPL6zC8ACM2oIL0= github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe/go.mod h1:wL8QJuTMNUDYhXwkmfOly8iTdp5TEcJFWZD2D7SIkUc= github.com/nkovacs/streamquote v0.0.0-20170412213628-49af9bddb229/go.mod h1:0aYXnNPJ8l7uZxf45rWW1a/uME32OF0rhiYGNQ2oF2E= github.com/oklog/ulid/v2 v2.1.0 h1:+9lhoxAP56we25tyYETBBY1YLA2SaoLvUFgrP2miPJU= @@ -346,9 +349,14 @@ github.com/tidwall/sjson v1.2.5/go.mod h1:Fvgq9kS/6ociJEDnK0Fk1cpYF4FIW6ZF7LAe+6 github.com/ugorji/go v1.1.4/go.mod h1:uQMGLiO92mf5W77hV/PUCpI3pbzQx3CRekS0kk+RGrc= github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= github.com/valyala/fasttemplate v1.0.1/go.mod h1:UQGH1tvbgY+Nz5t2n7tXsz52dQxojPUpymEIMZ47gx8= +github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c= github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI= github.com/xdg-go/scram v1.1.1/go.mod h1:RaEWvsqvNKKvBPvcKeFjrG2cJqOkHTiyTpzz23ni57g= +github.com/xdg-go/scram v1.1.2 h1:FHX5I5B4i4hKRVRBCFRxq1iQRej7WO3hhBuJf+UUySY= +github.com/xdg-go/scram v1.1.2/go.mod h1:RT/sEzTbU5y00aCK8UOx6R7YryM0iF1N2MOmC3kKLN4= github.com/xdg-go/stringprep v1.0.3/go.mod h1:W3f5j4i+9rC0kuIEJL0ky1VpHXQU3ocBgklLGvcBnW8= +github.com/xdg-go/stringprep v1.0.4 h1:XLI/Ng3O1Atzq0oBs3TWm+5ZVgkq2aqdlvP9JtoZ6c8= +github.com/xdg-go/stringprep v1.0.4/go.mod h1:mPGuuIYwz7CmR2bT9j4GbQqutWS1zV24gijq1dTyGkM= github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU= github.com/xeipuuv/gojsonpointer v0.0.0-20190905194746-02993c407bfb h1:zGWFAtiMcyryUHoUjUJX0/lt1H2+i2Ka2n+D3DImSNo= github.com/xeipuuv/gojsonpointer v0.0.0-20190905194746-02993c407bfb/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU= @@ -356,6 +364,7 @@ github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415 h1:EzJWgHo github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415/go.mod h1:GwrjFmJcFw6At/Gs6z4yjiIwzuJ1/+UwLxMQDVQXShQ= github.com/xeipuuv/gojsonschema v1.2.0 h1:LhYJRs+L4fBtjZUfuSZIKGeVu0QRy8e5Xi7D17UxZ74= github.com/xeipuuv/gojsonschema v1.2.0/go.mod h1:anYRn/JVcOK2ZgGU+IjEV4nwlhoK5sQluxsYJ78Id3Y= +github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d h1:splanxYIlg+5LfHAM6xpdFEAYOk8iySO56hMFq6uLyA= github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d/go.mod h1:rHwXgn7JulP+udvsHwJoVG1YGAP6VLg4y9I5dyZdqmA= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= @@ -365,6 +374,8 @@ github.com/zeebo/assert v1.3.0/go.mod h1:Pq9JiuJQpG8JLJdtkwrJESF0Foym2/D9XMU5ciN github.com/zeebo/xxh3 v1.0.2 h1:xZmwmqxHZA8AI603jOQ0tMqmBr9lPeFwGg6d+xy9DC0= github.com/zeebo/xxh3 v1.0.2/go.mod h1:5NWz9Sef7zIDm2JHfFlcQvNekmcEl9ekUZQQKCYaDcA= go.mongodb.org/mongo-driver v1.11.4/go.mod h1:PTSz5yu21bkT/wXpkS7WR5f0ddqw5quethTUn9WM+2g= +go.mongodb.org/mongo-driver v1.14.0 h1:P98w8egYRjYe3XDjxhYJagTokP/H6HzlsnojRgZRd80= +go.mongodb.org/mongo-driver v1.14.0/go.mod h1:Vzb0Mk/pa7e6cWw85R4F/endUC3u0U9jGcNU603k65c= go.opencensus.io v0.24.0 h1:y73uSU6J157QMP2kn2r30vwW1A2W2WFwSCGnAVxeaD0= go.opencensus.io v0.24.0/go.mod h1:vNK8G9p7aAivkbmorf4v+7Hgx+Zs0yY+0fOtgBfjQKo= go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.49.0 h1:4Pp6oUg3+e/6M4C0A/3kJ2VYa++dsWVTtGgLVj5xtHg= diff --git a/pkg/api/auth.go b/pkg/api/auth.go index 83cd711..69386c3 100644 --- a/pkg/api/auth.go +++ b/pkg/api/auth.go @@ -39,6 +39,7 @@ func (a *ScratchDataAPIStruct) AuthMiddleware(next http.Handler) http.Handler { } else { // Otherwise, this API key is specific to a user keyDetails, err := a.storageServices.Database.GetAPIKeyDetails(r.Context(), hashedKey) + log.Print(keyDetails.Destination.TeamID) if err != nil { w.WriteHeader(http.StatusUnauthorized) diff --git a/pkg/api/data.go b/pkg/api/data.go index d879a92..d2e4de0 100644 --- a/pkg/api/data.go +++ b/pkg/api/data.go @@ -99,6 +99,9 @@ func (a *ScratchDataAPIStruct) executeQueryAndStreamData(ctx context.Context, w case "csv": w.Header().Set("Content-Type", "text/csv") return dest.QueryCSV(query, w) + case "ndjson": + w.Header().Set("Content-Type", "text/plain") + return dest.QueryNDJson(query, w) default: w.Header().Set("Content-Type", "application/json") return dest.QueryJSON(query, w) diff --git a/pkg/destinations/destinations.go b/pkg/destinations/destinations.go index a26ad67..df819c7 100644 --- a/pkg/destinations/destinations.go +++ b/pkg/destinations/destinations.go @@ -14,6 +14,7 @@ import ( "github.com/scratchdata/scratchdata/pkg/destinations/bigquery" "github.com/scratchdata/scratchdata/pkg/destinations/clickhouse" "github.com/scratchdata/scratchdata/pkg/destinations/duckdb" + "github.com/scratchdata/scratchdata/pkg/destinations/mongodb" "github.com/scratchdata/scratchdata/pkg/destinations/redshift" ) @@ -63,14 +64,16 @@ func (m *DestinationManager) TestCredentials(creds config.Destination) error { var dest Destination var err error switch creds.Type { - case "duckdb": - dest, err = duckdb.OpenServer(creds.Settings) + case "bigquery": + dest, err = bigquery.OpenServer(creds.Settings) case "clickhouse": dest, err = clickhouse.OpenServer(creds.Settings) + case "duckdb": + dest, err = duckdb.OpenServer(creds.Settings) + case "mongodb": + dest, err = mongodb.OpenServer(creds.Settings) case "redshift": dest, err = redshift.OpenServer(creds.Settings) - case "bigquery": - dest, err = bigquery.OpenServer(creds.Settings) default: err = errors.New("Invalid destination type") } @@ -110,6 +113,8 @@ func (m *DestinationManager) Destination(ctx context.Context, databaseID int64) dest, err = redshift.OpenServer(settings) case "bigquery": dest, err = bigquery.OpenServer(settings) + case "mongodb": + dest, err = mongodb.OpenServer(settings) } if err != nil { diff --git a/pkg/destinations/mongodb/mongodb.go b/pkg/destinations/mongodb/mongodb.go new file mode 100644 index 0000000..d273a0a --- /dev/null +++ b/pkg/destinations/mongodb/mongodb.go @@ -0,0 +1,100 @@ +package mongodb + +import ( + "context" + "encoding/json" + "io" + + "github.com/scratchdata/scratchdata/models" + "github.com/scratchdata/scratchdata/pkg/util" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" +) + +type MongoDBServer struct { + URI string `mapstructure:"uri"` + Database string `mapstructure:"database"` + + db *mongo.Database +} + +func (s *MongoDBServer) QueryNDJson(query string, writer io.Writer) error { + coll := s.db.Collection("transactions") + + cursor, err := coll.Find(context.TODO(), bson.M{}, options.Find().SetLimit(2)) + if err != nil { + return err + } + + for cursor.Next(context.TODO()) { + var result bson.M + if err := cursor.Decode(&result); err != nil { + return err + } + + v, err := json.Marshal(result) + if err != nil { + return err + } + writer.Write(v) + writer.Write([]byte("\n")) + } + + return nil +} +func (s *MongoDBServer) QueryJSON(query string, writer io.Writer) error { + // Create a buffered reader for efficient reading + // ndjsonReader := strings.NewReader(ndjsonData) + // reader := bufio.NewReader(ndjsonReader) + + // // Create a buffered writer for efficient writing + // writer := bufio.NewWriter(jsonWriter) + + // query = `db.xy.find({"a":{}})` + + // tokens := strings.SplitN(query, ".", 3) + // log.Print(tokens) + // collection := tokens[1] + + // start := strings.Index(query, "(") + // end := strings.LastIndex(query, ")") + + // jsonList := "[" + query[start+1:end] + "]" + // log.Print(jsonList) + + // // isFind := strings.HasPrefix(tokens[2], "find(") + // // isAggregate := strings.HasPrefix(tokens[2], "aggregate(") + + return nil +} +func (s *MongoDBServer) QueryCSV(query string, writer io.Writer) error { return nil } + +func (s *MongoDBServer) Tables() ([]string, error) { return nil, nil } +func (s *MongoDBServer) Columns(table string) ([]models.Column, error) { return nil, nil } + +func (s *MongoDBServer) CreateEmptyTable(name string) error { return nil } +func (s *MongoDBServer) CreateColumns(table string, filePath string) error { return nil } +func (s *MongoDBServer) InsertFromNDJsonFile(table string, filePath string) error { return nil } + +func (s *MongoDBServer) Close() error { + return nil +} + +func OpenServer(settings map[string]any) (*MongoDBServer, error) { + srv := util.ConfigToStruct[MongoDBServer](settings) + + serverAPI := options.ServerAPI(options.ServerAPIVersion1) + opts := options.Client().ApplyURI(srv.URI).SetServerAPIOptions(serverAPI) + + client, err := mongo.Connect(context.TODO(), opts) + if err != nil { + return nil, err + } + + db := client.Database(srv.Database) + + srv.db = db + + return srv, nil +} diff --git a/pkg/storage/database/gorm/gorm.go b/pkg/storage/database/gorm/gorm.go index 232f450..3ac5523 100644 --- a/pkg/storage/database/gorm/gorm.go +++ b/pkg/storage/database/gorm/gorm.go @@ -243,8 +243,16 @@ func (s *Gorm) GetDestinations(c context.Context, userId uint) ([]models.Destina } func (s *Gorm) GetDestination(c context.Context, teamId, destId uint) (models.Destination, error) { + + var destinations []models.Destination + // res := s.db.Where("team_id = ?", teamId).Find(&destinations) + res := s.db.Find(&destinations) + for _, d := range destinations { + log.Print(d.ID, d.Type, d.TeamID) + } + var dest models.Destination - res := s.db.First(&dest, "team_id = ? AND id = ?", teamId, destId) + res = s.db.First(&dest, "team_id = ? AND id = ?", teamId, destId) if res.Error != nil { return dest, res.Error } @@ -300,12 +308,16 @@ func (s *Gorm) CreateUser(email string, source string, details string) (*models. func (s *Gorm) GetAPIKeyDetails(ctx context.Context, hashedKey string) (models.APIKey, error) { var dbKey models.APIKey + // TODO: get destination and team - tx := s.db.First(&dbKey, "hashed_api_key = ?", hashedKey) + tx := s.db.Joins("Destination.Team").First(&dbKey, "hashed_api_key = ?", hashedKey) if tx.RowsAffected == 0 { return models.APIKey{}, errors.New("api key not found") } + log.Print(dbKey.Destination) + log.Print(dbKey.Destination.Team) + return dbKey, nil } diff --git a/pkg/workers/copier.go b/pkg/workers/copier.go index b3c3194..f5021b0 100644 --- a/pkg/workers/copier.go +++ b/pkg/workers/copier.go @@ -1,11 +1,13 @@ package workers import ( + "bufio" "context" "os" "path/filepath" "github.com/rs/zerolog/log" + "github.com/scratchdata/scratchdata/pkg/api" "github.com/scratchdata/scratchdata/pkg/util" ) @@ -56,24 +58,104 @@ func (w *ScratchDataWorker) CopyData(sourceId int64, query string, destId uint, return err } - err = dest.CreateEmptyTable(destTable) - if err != nil { - return err - } + // Algorithm demuxes NDJSON that is nested into JSON with multiple tables + + flattener := api.NewMultiTableFlattener() + fds := map[string]*util.ChunkedWriter{} + maxCapacity := 1024 * 1024 * 10 + buf := make([]byte, maxCapacity) for _, f := range files { path := filepath.Join(localFolder, f.Name()) - err = dest.CreateColumns(destTable, path) + fd, err := os.Open(path) if err != nil { - log.Error().Err(err).Int64("source_id", sourceId).Uint("dest_id", destId).Str("table", destTable).Msg("Unable to create columns") - continue + return err } - err = dest.InsertFromNDJsonFile(destTable, path) + + scanner := bufio.NewScanner(fd) + scanner.Buffer(buf, maxCapacity) + + for scanner.Scan() { + line := scanner.Text() + flatItems, err := flattener.Flatten(destTable, line) + if err != nil { + return err + } + + for _, item := range flatItems { + + tableFd, ok := fds[item.Table] + if !ok { + ndjsonPath := filepath.Join(localFolder, "tables", item.Table) + err = os.MkdirAll(ndjsonPath, os.ModePerm) + if err != nil { + return err + } + + tableFd = util.NewChunkedWriter(w.Config.MaxBulkQuerySizeBytes, w.Config.BulkChunkSizeBytes, ndjsonPath) + fds[item.Table] = tableFd + } + + tableFd.Write([]byte(item.JSON)) + tableFd.Write([]byte("\n")) + } + } + + fd.Close() + } + + for table, fd := range fds { + err = fd.Close() + if err != nil { + return err + } + + folderName := filepath.Join(localFolder, "tables", table) + files, err := os.ReadDir(folderName) + if err != nil { + return err + } + + err = dest.CreateEmptyTable(table) if err != nil { - log.Error().Err(err).Int64("source_id", sourceId).Uint("dest_id", destId).Str("table", destTable).Msg("Unable to insert data") - continue + return err + } + + for _, f := range files { + path := filepath.Join(folderName, f.Name()) + err = dest.CreateColumns(table, path) + if err != nil { + log.Error().Err(err).Int64("source_id", sourceId).Uint("dest_id", destId).Str("table", table).Msg("Unable to create columns") + continue + } + err = dest.InsertFromNDJsonFile(table, path) + if err != nil { + log.Error().Err(err).Int64("source_id", sourceId).Uint("dest_id", destId).Str("table", table).Msg("Unable to insert data") + continue + } } + } + // This is the regular algorithm without demuxing 1 table to many. Assumes the NDJSON is flat. + // err = dest.CreateEmptyTable(destTable) + // if err != nil { + // return err + // } + + // for _, f := range files { + // path := filepath.Join(localFolder, f.Name()) + // err = dest.CreateColumns(destTable, path) + // if err != nil { + // log.Error().Err(err).Int64("source_id", sourceId).Uint("dest_id", destId).Str("table", destTable).Msg("Unable to create columns") + // continue + // } + // err = dest.InsertFromNDJsonFile(destTable, path) + // if err != nil { + // log.Error().Err(err).Int64("source_id", sourceId).Uint("dest_id", destId).Str("table", destTable).Msg("Unable to insert data") + // continue + // } + // } + return nil }