From a217695848641a7cda4a8a5a65ea7e640e2c9fd6 Mon Sep 17 00:00:00 2001 From: poundifdef Date: Fri, 5 Apr 2024 09:26:29 -0400 Subject: [PATCH 01/10] flattener iteration --- pkg/api/flattener.go | 79 +++++++++++++++++++++++++++++++++++++++ pkg/api/flattener_test.go | 29 ++++++++++++++ 2 files changed, 108 insertions(+) create mode 100644 pkg/api/flattener_test.go diff --git a/pkg/api/flattener.go b/pkg/api/flattener.go index eb47f6b0..0d98db4f 100644 --- a/pkg/api/flattener.go +++ b/pkg/api/flattener.go @@ -2,9 +2,13 @@ package api import ( "encoding/json" + + "github.com/bwmarrin/snowflake" "github.com/jeremywohl/flatten" "github.com/oklog/ulid/v2" "github.com/rs/zerolog/log" + "github.com/scratchdata/scratchdata/pkg/util" + "github.com/tidwall/gjson" "github.com/tidwall/sjson" ) @@ -152,3 +156,78 @@ func (h HorizontalFlattener) Flatten(table string, json string) ([]JSONData, err return rc, nil } + +type MultiTableFlattener struct { + snowflake *snowflake.Node +} + +func NewMultiTableFlattener() MultiTableFlattener { + snowflake, _ := util.NewSnowflakeGenerator() + return MultiTableFlattener{snowflake: snowflake} +} + +func (f MultiTableFlattener) FlattenJSON(table string, data gjson.Result, parent_table string, parent_id int64, output []JSONData) { + if data.IsObject() { + oid := f.snowflake.Generate().Int64() + rc := map[string]any{ + "id": oid, + table: data.Value(), + } + if parent_table != "" { + rc[parent_table+"_id"] = parent_id + } + j, _ := json.Marshal(rc) + + data.ForEach(func(key, value gjson.Result) bool { + if value.IsArray() || value.IsObject() { + f.FlattenJSON(key.String(), value, table, oid, output) + + } else { + rc[key.String()] = value.Value() + } + + return true // keep iterating + }) + + output = append(output, JSONData{Table: table, JSON: string(j)}) + + } else if data.IsArray() { + for _, v := range data.Array() { + f.FlattenJSON(table, v, parent_table, parent_id, output) + } + } else { + rc := map[string]any{ + "id": f.snowflake.Generate().Int64(), + table: data.Value(), + } + if parent_table != "" { + rc[parent_table+"_id"] = parent_id + } + j, _ := json.Marshal(rc) + output = append(output, JSONData{Table: table, JSON: string(j)}) + } + +} + +func (f MultiTableFlattener) Flatten(table string, json string) ([]JSONData, error) { + rc := []JSONData{} + + parsed := gjson.Parse(json) + f.FlattenJSON(table, parsed, "", 0, rc) + + log.Print("READY") + log.Print(rc) + log.Print("DONE") + + return rc, nil + // flat, err := flatten.FlattenString(json, "", flatten.UnderscoreStyle) + // if err != nil { + // return nil, err + // } + + // rc := []JSONData{ + // {Table: table, JSON: flat}, + // } + + // return rc, nil +} diff --git a/pkg/api/flattener_test.go b/pkg/api/flattener_test.go new file mode 100644 index 00000000..4a5a6459 --- /dev/null +++ b/pkg/api/flattener_test.go @@ -0,0 +1,29 @@ +package api + +import ( + "log" + "testing" +) + +func TestMultiTableFlattener(t *testing.T) { + json_str := `{ + "name": "John Doe", + "age": 30, + "address": { + "street": "123 Main St", + "city": "Anytown" + }, + "hobbies": [ + {"name": "Reading", "type": "Indoor"}, + {"name": "Cycling", "type": "Outdoor", "nested": {"scalar": "bar", "list": [1,2], "obj": {"hello":"world"}}} + ], + "numbers": [11, 22, 33] + }` + + f := NewMultiTableFlattener() + rc, _ := f.Flatten("t", json_str) + for _, v := range rc { + log.Println(v.Table, v.JSON) + + } +} From 10ab20c5516e6e985f96bbe2e320736ec5d33d30 Mon Sep 17 00:00:00 2001 From: poundifdef Date: Fri, 5 Apr 2024 09:48:15 -0400 Subject: [PATCH 02/10] working flattener --- pkg/api/flattener.go | 51 ++++++++++++++++++--------------------- pkg/api/flattener_test.go | 1 - 2 files changed, 24 insertions(+), 28 deletions(-) diff --git a/pkg/api/flattener.go b/pkg/api/flattener.go index 0d98db4f..b32ca416 100644 --- a/pkg/api/flattener.go +++ b/pkg/api/flattener.go @@ -166,22 +166,25 @@ func NewMultiTableFlattener() MultiTableFlattener { return MultiTableFlattener{snowflake: snowflake} } -func (f MultiTableFlattener) FlattenJSON(table string, data gjson.Result, parent_table string, parent_id int64, output []JSONData) { +func (f MultiTableFlattener) FlattenJSON(table string, data gjson.Result, parent_table string, parent_id int64) ([]JSONData, error) { + output := []JSONData{} + if data.IsObject() { oid := f.snowflake.Generate().Int64() rc := map[string]any{ - "id": oid, - table: data.Value(), + "id": oid, } if parent_table != "" { rc[parent_table+"_id"] = parent_id } - j, _ := json.Marshal(rc) data.ForEach(func(key, value gjson.Result) bool { if value.IsArray() || value.IsObject() { - f.FlattenJSON(key.String(), value, table, oid, output) - + nestedData, err := f.FlattenJSON(key.String(), value, table, oid) + if err != nil { + return false + } + output = append(output, nestedData...) } else { rc[key.String()] = value.Value() } @@ -189,11 +192,19 @@ func (f MultiTableFlattener) FlattenJSON(table string, data gjson.Result, parent return true // keep iterating }) + j, err := json.Marshal(rc) + if err != nil { + return []JSONData{}, err + } output = append(output, JSONData{Table: table, JSON: string(j)}) } else if data.IsArray() { for _, v := range data.Array() { - f.FlattenJSON(table, v, parent_table, parent_id, output) + nestedData, err := f.FlattenJSON(table, v, parent_table, parent_id) + if err != nil { + return []JSONData{}, err + } + output = append(output, nestedData...) } } else { rc := map[string]any{ @@ -203,31 +214,17 @@ func (f MultiTableFlattener) FlattenJSON(table string, data gjson.Result, parent if parent_table != "" { rc[parent_table+"_id"] = parent_id } - j, _ := json.Marshal(rc) + j, err := json.Marshal(rc) + if err != nil { + return []JSONData{}, err + } output = append(output, JSONData{Table: table, JSON: string(j)}) } + return output, nil } func (f MultiTableFlattener) Flatten(table string, json string) ([]JSONData, error) { - rc := []JSONData{} - parsed := gjson.Parse(json) - f.FlattenJSON(table, parsed, "", 0, rc) - - log.Print("READY") - log.Print(rc) - log.Print("DONE") - - return rc, nil - // flat, err := flatten.FlattenString(json, "", flatten.UnderscoreStyle) - // if err != nil { - // return nil, err - // } - - // rc := []JSONData{ - // {Table: table, JSON: flat}, - // } - - // return rc, nil + return f.FlattenJSON(table, parsed, "", 0) } diff --git a/pkg/api/flattener_test.go b/pkg/api/flattener_test.go index 4a5a6459..f7805d12 100644 --- a/pkg/api/flattener_test.go +++ b/pkg/api/flattener_test.go @@ -24,6 +24,5 @@ func TestMultiTableFlattener(t *testing.T) { rc, _ := f.Flatten("t", json_str) for _, v := range rc { log.Println(v.Table, v.JSON) - } } From da2ea3560e741d8fdec87325dae80282c609be4e Mon Sep 17 00:00:00 2001 From: poundifdef Date: Fri, 5 Apr 2024 09:54:34 -0400 Subject: [PATCH 03/10] add multi table flattening option --- pkg/api/data.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pkg/api/data.go b/pkg/api/data.go index 00ffb490..d879a92a 100644 --- a/pkg/api/data.go +++ b/pkg/api/data.go @@ -113,6 +113,8 @@ func (a *ScratchDataAPIStruct) Insert(w http.ResponseWriter, r *http.Request) { var flattener Flattener if flatten == "vertical" { flattener = VerticalFlattener{} + } else if flatten == "multitable" { + flattener = NewMultiTableFlattener() } else { flattener = HorizontalFlattener{} } From 6763acb15c0836232e54e2db633564e5ab400595 Mon Sep 17 00:00:00 2001 From: poundifdef Date: Fri, 5 Apr 2024 16:39:06 -0400 Subject: [PATCH 04/10] start of mongo --- go.mod | 7 +++ go.sum | 11 +++++ pkg/destinations/destinations.go | 11 +++-- pkg/destinations/mongodb/mongodb.go | 70 +++++++++++++++++++++++++++++ 4 files changed, 95 insertions(+), 4 deletions(-) create mode 100644 pkg/destinations/mongodb/mongodb.go diff --git a/go.mod b/go.mod index ac74f291..2cd4f3cf 100644 --- a/go.mod +++ b/go.mod @@ -90,6 +90,7 @@ require ( github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect github.com/golang/protobuf v1.5.3 // indirect + github.com/golang/snappy v0.0.4 // indirect github.com/google/flatbuffers v23.5.26+incompatible // indirect github.com/google/s2a-go v0.1.7 // indirect github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 // indirect @@ -117,6 +118,7 @@ require ( github.com/mattn/go-isatty v0.0.20 // indirect github.com/mattn/go-sqlite3 v1.14.22 // indirect github.com/moby/term v0.5.0 // indirect + github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe // indirect github.com/opencontainers/go-digest v1.0.0 // indirect github.com/opencontainers/image-spec v1.1.0 // indirect github.com/opencontainers/runc v1.1.12 // indirect @@ -131,10 +133,15 @@ require ( github.com/stretchr/testify v1.9.0 // indirect github.com/tidwall/match v1.1.1 // indirect github.com/tidwall/pretty v1.2.1 // indirect + github.com/xdg-go/pbkdf2 v1.0.0 // indirect + github.com/xdg-go/scram v1.1.2 // indirect + github.com/xdg-go/stringprep v1.0.4 // indirect github.com/xeipuuv/gojsonpointer v0.0.0-20190905194746-02993c407bfb // indirect github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415 // indirect github.com/xeipuuv/gojsonschema v1.2.0 // indirect + github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d // indirect github.com/zeebo/xxh3 v1.0.2 // indirect + go.mongodb.org/mongo-driver v1.14.0 // indirect go.opencensus.io v0.24.0 // indirect go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.49.0 // indirect go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0 // indirect diff --git a/go.sum b/go.sum index 5182aa3c..046004c3 100644 --- a/go.sum +++ b/go.sum @@ -173,6 +173,8 @@ github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiu github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= +github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/flatbuffers v23.5.26+incompatible h1:M9dgRyhJemaM4Sw8+66GHBu8ioaQmyPLg1b8VwK5WJg= github.com/google/flatbuffers v23.5.26+incompatible/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= @@ -276,6 +278,7 @@ github.com/moby/term v0.5.0 h1:xt8Q1nalod/v7BqbG21f8mQPqH+xAaC9C3N3wfWbVP0= github.com/moby/term v0.5.0/go.mod h1:8FzsFHVUBGZdbDsJw/ot+X+d5HLUbvklYLJ9uGfcI3Y= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= +github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe h1:iruDEfMl2E6fbMZ9s0scYfZQ84/6SPL6zC8ACM2oIL0= github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe/go.mod h1:wL8QJuTMNUDYhXwkmfOly8iTdp5TEcJFWZD2D7SIkUc= github.com/nkovacs/streamquote v0.0.0-20170412213628-49af9bddb229/go.mod h1:0aYXnNPJ8l7uZxf45rWW1a/uME32OF0rhiYGNQ2oF2E= github.com/oklog/ulid/v2 v2.1.0 h1:+9lhoxAP56we25tyYETBBY1YLA2SaoLvUFgrP2miPJU= @@ -346,9 +349,14 @@ github.com/tidwall/sjson v1.2.5/go.mod h1:Fvgq9kS/6ociJEDnK0Fk1cpYF4FIW6ZF7LAe+6 github.com/ugorji/go v1.1.4/go.mod h1:uQMGLiO92mf5W77hV/PUCpI3pbzQx3CRekS0kk+RGrc= github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= github.com/valyala/fasttemplate v1.0.1/go.mod h1:UQGH1tvbgY+Nz5t2n7tXsz52dQxojPUpymEIMZ47gx8= +github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c= github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI= github.com/xdg-go/scram v1.1.1/go.mod h1:RaEWvsqvNKKvBPvcKeFjrG2cJqOkHTiyTpzz23ni57g= +github.com/xdg-go/scram v1.1.2 h1:FHX5I5B4i4hKRVRBCFRxq1iQRej7WO3hhBuJf+UUySY= +github.com/xdg-go/scram v1.1.2/go.mod h1:RT/sEzTbU5y00aCK8UOx6R7YryM0iF1N2MOmC3kKLN4= github.com/xdg-go/stringprep v1.0.3/go.mod h1:W3f5j4i+9rC0kuIEJL0ky1VpHXQU3ocBgklLGvcBnW8= +github.com/xdg-go/stringprep v1.0.4 h1:XLI/Ng3O1Atzq0oBs3TWm+5ZVgkq2aqdlvP9JtoZ6c8= +github.com/xdg-go/stringprep v1.0.4/go.mod h1:mPGuuIYwz7CmR2bT9j4GbQqutWS1zV24gijq1dTyGkM= github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU= github.com/xeipuuv/gojsonpointer v0.0.0-20190905194746-02993c407bfb h1:zGWFAtiMcyryUHoUjUJX0/lt1H2+i2Ka2n+D3DImSNo= github.com/xeipuuv/gojsonpointer v0.0.0-20190905194746-02993c407bfb/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU= @@ -356,6 +364,7 @@ github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415 h1:EzJWgHo github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415/go.mod h1:GwrjFmJcFw6At/Gs6z4yjiIwzuJ1/+UwLxMQDVQXShQ= github.com/xeipuuv/gojsonschema v1.2.0 h1:LhYJRs+L4fBtjZUfuSZIKGeVu0QRy8e5Xi7D17UxZ74= github.com/xeipuuv/gojsonschema v1.2.0/go.mod h1:anYRn/JVcOK2ZgGU+IjEV4nwlhoK5sQluxsYJ78Id3Y= +github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d h1:splanxYIlg+5LfHAM6xpdFEAYOk8iySO56hMFq6uLyA= github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d/go.mod h1:rHwXgn7JulP+udvsHwJoVG1YGAP6VLg4y9I5dyZdqmA= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= @@ -365,6 +374,8 @@ github.com/zeebo/assert v1.3.0/go.mod h1:Pq9JiuJQpG8JLJdtkwrJESF0Foym2/D9XMU5ciN github.com/zeebo/xxh3 v1.0.2 h1:xZmwmqxHZA8AI603jOQ0tMqmBr9lPeFwGg6d+xy9DC0= github.com/zeebo/xxh3 v1.0.2/go.mod h1:5NWz9Sef7zIDm2JHfFlcQvNekmcEl9ekUZQQKCYaDcA= go.mongodb.org/mongo-driver v1.11.4/go.mod h1:PTSz5yu21bkT/wXpkS7WR5f0ddqw5quethTUn9WM+2g= +go.mongodb.org/mongo-driver v1.14.0 h1:P98w8egYRjYe3XDjxhYJagTokP/H6HzlsnojRgZRd80= +go.mongodb.org/mongo-driver v1.14.0/go.mod h1:Vzb0Mk/pa7e6cWw85R4F/endUC3u0U9jGcNU603k65c= go.opencensus.io v0.24.0 h1:y73uSU6J157QMP2kn2r30vwW1A2W2WFwSCGnAVxeaD0= go.opencensus.io v0.24.0/go.mod h1:vNK8G9p7aAivkbmorf4v+7Hgx+Zs0yY+0fOtgBfjQKo= go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.49.0 h1:4Pp6oUg3+e/6M4C0A/3kJ2VYa++dsWVTtGgLVj5xtHg= diff --git a/pkg/destinations/destinations.go b/pkg/destinations/destinations.go index a26ad67f..f5cd5dc2 100644 --- a/pkg/destinations/destinations.go +++ b/pkg/destinations/destinations.go @@ -14,6 +14,7 @@ import ( "github.com/scratchdata/scratchdata/pkg/destinations/bigquery" "github.com/scratchdata/scratchdata/pkg/destinations/clickhouse" "github.com/scratchdata/scratchdata/pkg/destinations/duckdb" + "github.com/scratchdata/scratchdata/pkg/destinations/mongodb" "github.com/scratchdata/scratchdata/pkg/destinations/redshift" ) @@ -63,14 +64,16 @@ func (m *DestinationManager) TestCredentials(creds config.Destination) error { var dest Destination var err error switch creds.Type { - case "duckdb": - dest, err = duckdb.OpenServer(creds.Settings) + case "bigquery": + dest, err = bigquery.OpenServer(creds.Settings) case "clickhouse": dest, err = clickhouse.OpenServer(creds.Settings) + case "duckdb": + dest, err = duckdb.OpenServer(creds.Settings) + case "mongodb": + dest, err = mongodb.OpenServer(creds.Settings) case "redshift": dest, err = redshift.OpenServer(creds.Settings) - case "bigquery": - dest, err = bigquery.OpenServer(creds.Settings) default: err = errors.New("Invalid destination type") } diff --git a/pkg/destinations/mongodb/mongodb.go b/pkg/destinations/mongodb/mongodb.go new file mode 100644 index 00000000..2004b8ac --- /dev/null +++ b/pkg/destinations/mongodb/mongodb.go @@ -0,0 +1,70 @@ +package mongodb + +import ( + "context" + "io" + "log" + "strings" + + "github.com/scratchdata/scratchdata/models" + "github.com/scratchdata/scratchdata/pkg/util" + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" +) + +type MongoDBServer struct { +} + +func (s *MongoDBServer) QueryNDJson(query string, writer io.Writer) error { return nil } +func (s *MongoDBServer) QueryJSON(query string, writer io.Writer) error { + query = `db.xy.find({"a":{}})` + + tokens := strings.SplitN(query, ".", 3) + log.Print(tokens) + collection := tokens[1] + + start := strings.Index(query, "(") + end := strings.LastIndex(query, ")") + + jsonList := "[" + query[start+1:end] + "]" + log.Print(jsonList) + + serverAPI := options.ServerAPI(options.ServerAPIVersion1) + opts := options.Client().ApplyURI(uri).SetServerAPIOptions(serverAPI) + // Create a new client and connect to the server + client, err := mongo.Connect(context.TODO(), opts) + if err != nil { + return err + } + + db := client.Database("x") + // db.RunCommand() + coll := db.Collection(collection) + + // coll.Run + // coll.Find() + + coll.Find() + + // isFind := strings.HasPrefix(tokens[2], "find(") + // isAggregate := strings.HasPrefix(tokens[2], "aggregate(") + + return nil +} +func (s *MongoDBServer) QueryCSV(query string, writer io.Writer) error { return nil } + +func (s *MongoDBServer) Tables() ([]string, error) { return nil, nil } +func (s *MongoDBServer) Columns(table string) ([]models.Column, error) { return nil, nil } + +func (s *MongoDBServer) CreateEmptyTable(name string) error { return nil } +func (s *MongoDBServer) CreateColumns(table string, filePath string) error { return nil } +func (s *MongoDBServer) InsertFromNDJsonFile(table string, filePath string) error { return nil } + +func (s *MongoDBServer) Close() error { + return nil +} + +func OpenServer(settings map[string]any) (*MongoDBServer, error) { + srv := util.ConfigToStruct[MongoDBServer](settings) + return srv, nil +} From 2f0064b43803fea541dad2d636a226a5fed17fe1 Mon Sep 17 00:00:00 2001 From: poundifdef Date: Sun, 7 Apr 2024 09:19:14 -0400 Subject: [PATCH 05/10] updates --- pkg/destinations/mongodb/mongodb.go | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/pkg/destinations/mongodb/mongodb.go b/pkg/destinations/mongodb/mongodb.go index 2004b8ac..7fa87d5e 100644 --- a/pkg/destinations/mongodb/mongodb.go +++ b/pkg/destinations/mongodb/mongodb.go @@ -2,12 +2,14 @@ package mongodb import ( "context" + "fmt" "io" "log" "strings" "github.com/scratchdata/scratchdata/models" "github.com/scratchdata/scratchdata/pkg/util" + "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" ) @@ -30,7 +32,7 @@ func (s *MongoDBServer) QueryJSON(query string, writer io.Writer) error { log.Print(jsonList) serverAPI := options.ServerAPI(options.ServerAPIVersion1) - opts := options.Client().ApplyURI(uri).SetServerAPIOptions(serverAPI) + opts := options.Client().ApplyURI("x").SetServerAPIOptions(serverAPI) // Create a new client and connect to the server client, err := mongo.Connect(context.TODO(), opts) if err != nil { @@ -44,7 +46,15 @@ func (s *MongoDBServer) QueryJSON(query string, writer io.Writer) error { // coll.Run // coll.Find() - coll.Find() + cursor, _ := coll.Find(context.Background(), "") + + for cursor.Next(context.TODO()) { + var result bson.M + if err := cursor.Decode(&result); err != nil { + log.Fatal(err) + } + fmt.Printf("%+v\n", result) + } // isFind := strings.HasPrefix(tokens[2], "find(") // isAggregate := strings.HasPrefix(tokens[2], "aggregate(") From ed2422082838f716ce9e942914ac699c29d5fd2d Mon Sep 17 00:00:00 2001 From: poundifdef Date: Mon, 8 Apr 2024 13:06:40 -0400 Subject: [PATCH 06/10] working ndjson --- pkg/api/data.go | 3 + pkg/destinations/destinations.go | 2 + pkg/destinations/mongodb/mongodb.go | 86 ++++++++++++++++++----------- 3 files changed, 58 insertions(+), 33 deletions(-) diff --git a/pkg/api/data.go b/pkg/api/data.go index d879a92a..d2e4de07 100644 --- a/pkg/api/data.go +++ b/pkg/api/data.go @@ -99,6 +99,9 @@ func (a *ScratchDataAPIStruct) executeQueryAndStreamData(ctx context.Context, w case "csv": w.Header().Set("Content-Type", "text/csv") return dest.QueryCSV(query, w) + case "ndjson": + w.Header().Set("Content-Type", "text/plain") + return dest.QueryNDJson(query, w) default: w.Header().Set("Content-Type", "application/json") return dest.QueryJSON(query, w) diff --git a/pkg/destinations/destinations.go b/pkg/destinations/destinations.go index f5cd5dc2..df819c7a 100644 --- a/pkg/destinations/destinations.go +++ b/pkg/destinations/destinations.go @@ -113,6 +113,8 @@ func (m *DestinationManager) Destination(ctx context.Context, databaseID int64) dest, err = redshift.OpenServer(settings) case "bigquery": dest, err = bigquery.OpenServer(settings) + case "mongodb": + dest, err = mongodb.OpenServer(settings) } if err != nil { diff --git a/pkg/destinations/mongodb/mongodb.go b/pkg/destinations/mongodb/mongodb.go index 7fa87d5e..d273a0ae 100644 --- a/pkg/destinations/mongodb/mongodb.go +++ b/pkg/destinations/mongodb/mongodb.go @@ -2,10 +2,8 @@ package mongodb import ( "context" - "fmt" + "encoding/json" "io" - "log" - "strings" "github.com/scratchdata/scratchdata/models" "github.com/scratchdata/scratchdata/pkg/util" @@ -15,49 +13,58 @@ import ( ) type MongoDBServer struct { -} - -func (s *MongoDBServer) QueryNDJson(query string, writer io.Writer) error { return nil } -func (s *MongoDBServer) QueryJSON(query string, writer io.Writer) error { - query = `db.xy.find({"a":{}})` - - tokens := strings.SplitN(query, ".", 3) - log.Print(tokens) - collection := tokens[1] + URI string `mapstructure:"uri"` + Database string `mapstructure:"database"` - start := strings.Index(query, "(") - end := strings.LastIndex(query, ")") + db *mongo.Database +} - jsonList := "[" + query[start+1:end] + "]" - log.Print(jsonList) +func (s *MongoDBServer) QueryNDJson(query string, writer io.Writer) error { + coll := s.db.Collection("transactions") - serverAPI := options.ServerAPI(options.ServerAPIVersion1) - opts := options.Client().ApplyURI("x").SetServerAPIOptions(serverAPI) - // Create a new client and connect to the server - client, err := mongo.Connect(context.TODO(), opts) + cursor, err := coll.Find(context.TODO(), bson.M{}, options.Find().SetLimit(2)) if err != nil { return err } - db := client.Database("x") - // db.RunCommand() - coll := db.Collection(collection) - - // coll.Run - // coll.Find() - - cursor, _ := coll.Find(context.Background(), "") - for cursor.Next(context.TODO()) { var result bson.M if err := cursor.Decode(&result); err != nil { - log.Fatal(err) + return err + } + + v, err := json.Marshal(result) + if err != nil { + return err } - fmt.Printf("%+v\n", result) + writer.Write(v) + writer.Write([]byte("\n")) } - // isFind := strings.HasPrefix(tokens[2], "find(") - // isAggregate := strings.HasPrefix(tokens[2], "aggregate(") + return nil +} +func (s *MongoDBServer) QueryJSON(query string, writer io.Writer) error { + // Create a buffered reader for efficient reading + // ndjsonReader := strings.NewReader(ndjsonData) + // reader := bufio.NewReader(ndjsonReader) + + // // Create a buffered writer for efficient writing + // writer := bufio.NewWriter(jsonWriter) + + // query = `db.xy.find({"a":{}})` + + // tokens := strings.SplitN(query, ".", 3) + // log.Print(tokens) + // collection := tokens[1] + + // start := strings.Index(query, "(") + // end := strings.LastIndex(query, ")") + + // jsonList := "[" + query[start+1:end] + "]" + // log.Print(jsonList) + + // // isFind := strings.HasPrefix(tokens[2], "find(") + // // isAggregate := strings.HasPrefix(tokens[2], "aggregate(") return nil } @@ -76,5 +83,18 @@ func (s *MongoDBServer) Close() error { func OpenServer(settings map[string]any) (*MongoDBServer, error) { srv := util.ConfigToStruct[MongoDBServer](settings) + + serverAPI := options.ServerAPI(options.ServerAPIVersion1) + opts := options.Client().ApplyURI(srv.URI).SetServerAPIOptions(serverAPI) + + client, err := mongo.Connect(context.TODO(), opts) + if err != nil { + return nil, err + } + + db := client.Database(srv.Database) + + srv.db = db + return srv, nil } From 153e349c7913badd19b4782ac9ce30b439f0a89f Mon Sep 17 00:00:00 2001 From: poundifdef Date: Tue, 9 Apr 2024 08:01:20 -0400 Subject: [PATCH 07/10] progress --- pkg/api/auth.go | 1 + pkg/api/data.go | 1 + pkg/storage/database/gorm/gorm.go | 11 +++- pkg/workers/copier.go | 102 +++++++++++++++++++++++++++--- 4 files changed, 104 insertions(+), 11 deletions(-) diff --git a/pkg/api/auth.go b/pkg/api/auth.go index 83cd7114..69386c3c 100644 --- a/pkg/api/auth.go +++ b/pkg/api/auth.go @@ -39,6 +39,7 @@ func (a *ScratchDataAPIStruct) AuthMiddleware(next http.Handler) http.Handler { } else { // Otherwise, this API key is specific to a user keyDetails, err := a.storageServices.Database.GetAPIKeyDetails(r.Context(), hashedKey) + log.Print(keyDetails.Destination.TeamID) if err != nil { w.WriteHeader(http.StatusUnauthorized) diff --git a/pkg/api/data.go b/pkg/api/data.go index d2e4de07..e0cd55a2 100644 --- a/pkg/api/data.go +++ b/pkg/api/data.go @@ -41,6 +41,7 @@ func (a *ScratchDataAPIStruct) Copy(w http.ResponseWriter, r *http.Request) { message.SourceID = a.AuthGetDatabaseID(r.Context()) teamId := a.AuthGetTeamID(r.Context()) + log.Print(teamId) // Make sure the destination db is the same team as the source _, err = a.storageServices.Database.GetDestination(r.Context(), teamId, message.DestinationID) diff --git a/pkg/storage/database/gorm/gorm.go b/pkg/storage/database/gorm/gorm.go index 5f53ad1a..12fecff5 100644 --- a/pkg/storage/database/gorm/gorm.go +++ b/pkg/storage/database/gorm/gorm.go @@ -208,8 +208,16 @@ func (s *Gorm) GetDestinations(c context.Context, userId uint) ([]models.Destina } func (s *Gorm) GetDestination(c context.Context, teamId, destId uint) (models.Destination, error) { + + var destinations []models.Destination + // res := s.db.Where("team_id = ?", teamId).Find(&destinations) + res := s.db.Find(&destinations) + for _, d := range destinations { + log.Print(d.ID, d.Type, d.TeamID) + } + var dest models.Destination - res := s.db.First(&dest, "team_id = ? AND id = ?", teamId, destId) + res = s.db.First(&dest, "team_id = ? AND id = ?", teamId, destId) if res.Error != nil { return dest, res.Error } @@ -265,6 +273,7 @@ func (s *Gorm) CreateUser(email string, source string, details string) (*models. func (s *Gorm) GetAPIKeyDetails(ctx context.Context, hashedKey string) (models.APIKey, error) { var dbKey models.APIKey + // TODO: get destination and team tx := s.db.First(&dbKey, "hashed_api_key = ?", hashedKey) if tx.RowsAffected == 0 { diff --git a/pkg/workers/copier.go b/pkg/workers/copier.go index b3c31947..f49e13c4 100644 --- a/pkg/workers/copier.go +++ b/pkg/workers/copier.go @@ -1,11 +1,13 @@ package workers import ( + "bufio" "context" "os" "path/filepath" "github.com/rs/zerolog/log" + "github.com/scratchdata/scratchdata/pkg/api" "github.com/scratchdata/scratchdata/pkg/util" ) @@ -56,24 +58,104 @@ func (w *ScratchDataWorker) CopyData(sourceId int64, query string, destId uint, return err } - err = dest.CreateEmptyTable(destTable) - if err != nil { - return err - } + // Algorithm demuxes NDJSON that is nested into JSON with multiple tables + + flattener := api.NewMultiTableFlattener() + fds := map[string]*util.ChunkedWriter{} + maxCapacity := 1024 * 1024 * 10 + buf := make([]byte, maxCapacity) for _, f := range files { path := filepath.Join(localFolder, f.Name()) - err = dest.CreateColumns(destTable, path) + fd, err := os.Open(path) if err != nil { - log.Error().Err(err).Int64("source_id", sourceId).Uint("dest_id", destId).Str("table", destTable).Msg("Unable to create columns") - continue + return err } - err = dest.InsertFromNDJsonFile(destTable, path) + + scanner := bufio.NewScanner(fd) + scanner.Buffer(buf, maxCapacity) + + for scanner.Scan() { + line := scanner.Text() + flatItems, err := flattener.Flatten(destTable, line) + if err != nil { + return err + } + + for _, item := range flatItems { + + tableFd, ok := fds[item.Table] + if !ok { + ndjsonPath := filepath.Join(localFolder, "tables", item.Table) + err = os.MkdirAll(ndjsonPath, os.ModePerm) + if err != nil { + return err + } + + tableFd := util.NewChunkedWriter(w.Config.MaxBulkQuerySizeBytes, w.Config.BulkChunkSizeBytes, ndjsonPath) + fds[item.Table] = tableFd + } + + tableFd.Write([]byte(item.JSON)) + tableFd.Write([]byte("\n")) + } + } + + fd.Close() + } + + for table, fd := range fds { + err = fd.Close() + if err != nil { + return err + } + + folderName := filepath.Join(localFolder, "tables", table) + files, err := os.ReadDir(folderName) + if err != nil { + return err + } + + err = dest.CreateEmptyTable(destTable) if err != nil { - log.Error().Err(err).Int64("source_id", sourceId).Uint("dest_id", destId).Str("table", destTable).Msg("Unable to insert data") - continue + return err + } + + for _, f := range files { + path := filepath.Join(folderName, f.Name()) + err = dest.CreateColumns(table, path) + if err != nil { + log.Error().Err(err).Int64("source_id", sourceId).Uint("dest_id", destId).Str("table", table).Msg("Unable to create columns") + continue + } + err = dest.InsertFromNDJsonFile(table, path) + if err != nil { + log.Error().Err(err).Int64("source_id", sourceId).Uint("dest_id", destId).Str("table", table).Msg("Unable to insert data") + continue + } } + } + // This is the regular algorithm without demuxing 1 table to many. Assumes the NDJSON is flat. + // err = dest.CreateEmptyTable(destTable) + // if err != nil { + // return err + // } + + // for _, f := range files { + // path := filepath.Join(localFolder, f.Name()) + // err = dest.CreateColumns(destTable, path) + // if err != nil { + // log.Error().Err(err).Int64("source_id", sourceId).Uint("dest_id", destId).Str("table", destTable).Msg("Unable to create columns") + // continue + // } + // err = dest.InsertFromNDJsonFile(destTable, path) + // if err != nil { + // log.Error().Err(err).Int64("source_id", sourceId).Uint("dest_id", destId).Str("table", destTable).Msg("Unable to insert data") + // continue + // } + // } + return nil } From e64f9e1f67b48ab5ece10dc2416187a457b8dccd Mon Sep 17 00:00:00 2001 From: poundifdef Date: Tue, 9 Apr 2024 08:01:33 -0400 Subject: [PATCH 08/10] update --- pkg/api/data.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/api/data.go b/pkg/api/data.go index e0cd55a2..d2e4de07 100644 --- a/pkg/api/data.go +++ b/pkg/api/data.go @@ -41,7 +41,6 @@ func (a *ScratchDataAPIStruct) Copy(w http.ResponseWriter, r *http.Request) { message.SourceID = a.AuthGetDatabaseID(r.Context()) teamId := a.AuthGetTeamID(r.Context()) - log.Print(teamId) // Make sure the destination db is the same team as the source _, err = a.storageServices.Database.GetDestination(r.Context(), teamId, message.DestinationID) From e65b7f89de05434f45417e471012ab3e6a09bbcb Mon Sep 17 00:00:00 2001 From: poundifdef Date: Tue, 9 Apr 2024 09:07:49 -0400 Subject: [PATCH 09/10] update --- pkg/storage/database/gorm/gorm.go | 5 ++++- pkg/workers/copier.go | 2 +- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/pkg/storage/database/gorm/gorm.go b/pkg/storage/database/gorm/gorm.go index 12fecff5..38b61aa5 100644 --- a/pkg/storage/database/gorm/gorm.go +++ b/pkg/storage/database/gorm/gorm.go @@ -275,11 +275,14 @@ func (s *Gorm) GetAPIKeyDetails(ctx context.Context, hashedKey string) (models.A var dbKey models.APIKey // TODO: get destination and team - tx := s.db.First(&dbKey, "hashed_api_key = ?", hashedKey) + tx := s.db.Joins("Destination.Team").First(&dbKey, "hashed_api_key = ?", hashedKey) if tx.RowsAffected == 0 { return models.APIKey{}, errors.New("api key not found") } + log.Print(dbKey.Destination) + log.Print(dbKey.Destination.Team) + return dbKey, nil } diff --git a/pkg/workers/copier.go b/pkg/workers/copier.go index f49e13c4..dc6faeb3 100644 --- a/pkg/workers/copier.go +++ b/pkg/workers/copier.go @@ -92,7 +92,7 @@ func (w *ScratchDataWorker) CopyData(sourceId int64, query string, destId uint, return err } - tableFd := util.NewChunkedWriter(w.Config.MaxBulkQuerySizeBytes, w.Config.BulkChunkSizeBytes, ndjsonPath) + tableFd = util.NewChunkedWriter(w.Config.MaxBulkQuerySizeBytes, w.Config.BulkChunkSizeBytes, ndjsonPath) fds[item.Table] = tableFd } From c7770570459f88ef5a5a39a13c5049a33801dbc3 Mon Sep 17 00:00:00 2001 From: poundifdef Date: Tue, 9 Apr 2024 11:12:02 -0400 Subject: [PATCH 10/10] bug fix --- pkg/workers/copier.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/workers/copier.go b/pkg/workers/copier.go index dc6faeb3..f5021b07 100644 --- a/pkg/workers/copier.go +++ b/pkg/workers/copier.go @@ -116,7 +116,7 @@ func (w *ScratchDataWorker) CopyData(sourceId int64, query string, destId uint, return err } - err = dest.CreateEmptyTable(destTable) + err = dest.CreateEmptyTable(table) if err != nil { return err }