From 31cd5f19e1b65de4dd1664ae0d1b4c138acc3237 Mon Sep 17 00:00:00 2001 From: Przemyslaw Delewski <102958445+pdelewski@users.noreply.github.com> Date: Fri, 10 May 2024 13:05:31 +0200 Subject: [PATCH] Get rid of ProcessFacetsQuery, use one common ProcessQuery (#75) Final `ProcessQuery` unification. One common method --- quesma/clickhouse/quesma_communicator.go | 28 ++++++++++-------------- quesma/quesma/search.go | 16 +++++++------- quesma/quesma/termsenum/terms_enum.go | 2 +- 3 files changed, 21 insertions(+), 25 deletions(-) diff --git a/quesma/clickhouse/quesma_communicator.go b/quesma/clickhouse/quesma_communicator.go index 9dd17f896..7acf63e5c 100644 --- a/quesma/clickhouse/quesma_communicator.go +++ b/quesma/clickhouse/quesma_communicator.go @@ -26,22 +26,28 @@ func (lm *LogManager) Query(ctx context.Context, query string) (*sql.Rows, error return rows, err } +// GetAllColumns - returns all columns for a given table including non-schema fields +func (lm *LogManager) GetAllColumns(table *Table, query *model.Query) []string { + columns, err := table.extractColumns(query, true) + if err != nil { + logger.Error().Msgf("Failed to extract columns from query: %v", err) + return nil + } + return columns +} + // ProcessQuery - only WHERE clause // TODO query param should be type safe Query representing all parts of // sql statement that were already parsed and not string from which // we have to extract again different parts like where clause and columns to build a proper result -func (lm *LogManager) ProcessQuery(ctx context.Context, table *Table, query *model.Query) ([]model.QueryResultRow, error) { +func (lm *LogManager) ProcessQuery(ctx context.Context, table *Table, query *model.Query, columns []string) ([]model.QueryResultRow, error) { colNames, err := table.extractColumns(query, false) rowToScan := make([]interface{}, len(colNames)+len(query.NonSchemaFields)) if err != nil { return nil, err } - resultColumns, err := table.extractColumns(query, true) - if err != nil { - return nil, err - } - rows, err := executeQuery(ctx, lm, table.Name, query.StringFromColumns(colNames), resultColumns, rowToScan) + rows, err := executeQuery(ctx, lm, table.Name, query.StringFromColumns(colNames), columns, rowToScan) if err == nil { for _, row := range rows { row.Index = table.Name @@ -50,16 +56,6 @@ func (lm *LogManager) ProcessQuery(ctx context.Context, table *Table, query *mod return rows, err } -// TODO add support for autocomplete for attributes, if we'll find it needed -func (lm *LogManager) ProcessFacetsQuery(ctx context.Context, table *Table, query *model.Query) ([]model.QueryResultRow, error) { - colNames, err := table.extractColumns(query, false) - if err != nil { - return nil, err - } - rowToScan := make([]interface{}, len(colNames)+len(query.NonSchemaFields)) - return executeQuery(ctx, lm, table.Name, query.StringFromColumns(colNames), []string{"key", "doc_count"}, rowToScan) -} - var random = rand.New(rand.NewSource(time.Now().UnixNano())) const slowQueryThreshold = 30 * time.Second diff --git a/quesma/quesma/search.go b/quesma/quesma/search.go index b55bcb3c4..cc0dfd060 100644 --- a/quesma/quesma/search.go +++ b/quesma/quesma/search.go @@ -241,14 +241,14 @@ func (q *QueryRunner) handleSearchCommon(ctx context.Context, indexPattern strin fieldName = "*" } listQuery := queryTranslator.BuildNRowsQuery(fieldName, simpleQuery, queryInfo.Size) - hitsFallback, err = q.logManager.ProcessQuery(ctx, table, listQuery) + hitsFallback, err = q.logManager.ProcessQuery(ctx, table, listQuery, q.logManager.GetAllColumns(table, listQuery)) if err != nil { logger.ErrorWithCtx(ctx).Msgf("error processing fallback query. Err: %v, query: %+v", err, listQuery) pushSecondaryInfo(q.quesmaManagementConsole, id, path, body, translatedQueryBody, responseBody, startTime) return responseBody, err } countQuery := queryTranslator.BuildSimpleCountQuery(simpleQuery.Sql.Stmt) - countResult, err := q.logManager.ProcessQuery(ctx, table, countQuery) + countResult, err := q.logManager.ProcessQuery(ctx, table, countQuery, q.logManager.GetAllColumns(table, listQuery)) if err != nil { logger.ErrorWithCtx(ctx).Msgf("error processing count query. Err: %v, query: %+v", err, countQuery) pushSecondaryInfo(q.quesmaManagementConsole, id, path, body, translatedQueryBody, responseBody, startTime) @@ -461,26 +461,26 @@ func (q *QueryRunner) searchWorkerCommon(ctx context.Context, queryTranslator IQ switch queryInfo.Typ { case model.CountAsync: fullQuery = queryTranslator.BuildSimpleCountQuery(simpleQuery.Sql.Stmt) - hits, err = q.logManager.ProcessQuery(dbQueryCtx, table, fullQuery) + hits, err = q.logManager.ProcessQuery(dbQueryCtx, table, fullQuery, q.logManager.GetAllColumns(table, fullQuery)) case model.Facets, model.FacetsNumeric: // queryInfo = (Facets, fieldName, Limit results, Limit last rows to look into) fullQuery = queryTranslator.BuildFacetsQuery(queryInfo.FieldName, simpleQuery, queryInfo.I2) - hits, err = q.logManager.ProcessFacetsQuery(dbQueryCtx, table, fullQuery) + hits, err = q.logManager.ProcessQuery(dbQueryCtx, table, fullQuery, []string{"key", "doc_count"}) case model.ListByField: // queryInfo = (ListByField, fieldName, 0, LIMIT) fullQuery = queryTranslator.BuildNRowsQuery(queryInfo.FieldName, simpleQuery, queryInfo.I2) - hits, err = q.logManager.ProcessQuery(dbQueryCtx, table, fullQuery) + hits, err = q.logManager.ProcessQuery(dbQueryCtx, table, fullQuery, q.logManager.GetAllColumns(table, fullQuery)) case model.ListAllFields: // queryInfo = (ListAllFields, "*", 0, LIMIT) fullQuery = queryTranslator.BuildNRowsQuery("*", simpleQuery, queryInfo.I2) - hits, err = q.logManager.ProcessQuery(dbQueryCtx, table, fullQuery) + hits, err = q.logManager.ProcessQuery(dbQueryCtx, table, fullQuery, q.logManager.GetAllColumns(table, fullQuery)) case model.Normal: fullQuery = queryTranslator.BuildSimpleSelectQuery(simpleQuery.Sql.Stmt, queryInfo.I2) - hits, err = q.logManager.ProcessQuery(dbQueryCtx, table, fullQuery) + hits, err = q.logManager.ProcessQuery(dbQueryCtx, table, fullQuery, q.logManager.GetAllColumns(table, fullQuery)) default: logger.ErrorWithCtx(ctx).Msgf("unknown query type: %v, query body: %v", queryInfo.Typ, body) @@ -543,7 +543,7 @@ func (q *QueryRunner) searchAggregationWorkerCommon(ctx context.Context, aggrega for _, agg := range aggregations { logger.InfoWithCtx(ctx).Msg(agg.String()) // I'd keep for now until aggregations work fully sqls += agg.Query.String() + "\n" - rows, err := q.logManager.ProcessQuery(dbQueryCtx, table, &agg.Query) + rows, err := q.logManager.ProcessQuery(dbQueryCtx, table, &agg.Query, q.logManager.GetAllColumns(table, &agg.Query)) if err != nil { logger.ErrorWithCtx(ctx).Msg(err.Error()) continue diff --git a/quesma/quesma/termsenum/terms_enum.go b/quesma/quesma/termsenum/terms_enum.go index 97f236830..5dd115baa 100644 --- a/quesma/quesma/termsenum/terms_enum.go +++ b/quesma/quesma/termsenum/terms_enum.go @@ -37,7 +37,7 @@ func handleTermsEnumRequest(ctx context.Context, reqBody []byte, qt *queryparser dbQueryCtx, cancel := context.WithCancel(ctx) // TODO this will be used to cancel goroutine that is executing the query _ = cancel - if rows, err2 := qt.ClickhouseLM.ProcessQuery(dbQueryCtx, qt.Table, selectQuery); err2 != nil { + if rows, err2 := qt.ClickhouseLM.ProcessQuery(dbQueryCtx, qt.Table, selectQuery, qt.ClickhouseLM.GetAllColumns(qt.Table, selectQuery)); err2 != nil { logger.Error().Msgf("terms enum failed - error processing SQL query [%s]", err2) result, err = json.Marshal(emptyTermsEnumResponse()) } else {