- {!isHidden &&
- {preserveOriginals &&
item.key === 'from_container') &&
+ }
label={localization.searchPage.downloadLabel}
title={localization.searchPage.downloadDescriptionLabel}
primary={true}
onTouchTap={() => { window.open(downloadUri) }}
- />}
+ />
}
{!hidden_mark && hideFile(fileId)}
- />}
- {(isHidden || hidden_mark) && }
- label={localization.searchPage.restoreLabel}
- title={localization.searchPage.restoreDescriptionLabel}
- primary={true}
- onTouchTap={() => showFile(fileId)}
- />}
+ />}
}
diff --git a/FrontEnd/src/components/Search/components/Views/TableView/TableView.js b/FrontEnd/src/components/Search/components/Views/TableView/TableView.js
index 340df2e..431d0f9 100644
--- a/FrontEnd/src/components/Search/components/Views/TableView/TableView.js
+++ b/FrontEnd/src/components/Search/components/Views/TableView/TableView.js
@@ -55,7 +55,7 @@ class TableView extends Component {
key={hit.file_id}
hit={hit}
thumbnailUri={urls.ambarWebApiGetThumbnail(hit.sha256)}
- downloadUri={urls.ambarWebApiGetFile(hit.meta.download_uri)}
+ downloadUri={urls.ambarWebApiGetFile(hit.meta.full_name)}
{...this.props}
/>
)}
diff --git a/FrontEnd/src/components/Search/components/Views/TableView/components/TableRow/TableRow.js b/FrontEnd/src/components/Search/components/Views/TableView/components/TableRow/TableRow.js
index c31af76..5a32628 100644
--- a/FrontEnd/src/components/Search/components/Views/TableView/components/TableRow/TableRow.js
+++ b/FrontEnd/src/components/Search/components/Views/TableView/components/TableRow/TableRow.js
@@ -80,10 +80,10 @@ class TableRowResult extends Component {
- {preserveOriginals && { window.open(downloadUri) }}
+ {!hidden_mark && meta.source_id != 'ui-upload' && !meta.extra.some(item => item.key === 'from_container') && { window.open(downloadUri) }}
title={localization.searchPage.downloadDescriptionLabel}>
- }
+ }
{
@@ -94,10 +94,7 @@ class TableRowResult extends Component {
{!hidden_mark && hideFile(fileId)} title={localization.searchPage.removeLabel}>
- }
- {(isHidden || hidden_mark) && showFile(fileId)} title={localization.searchPage.restoreLabel}>
-
- }
+ }
)
diff --git a/FrontEnd/src/layouts/MainLayout/components/RateUs/RateUs.js b/FrontEnd/src/layouts/MainLayout/components/RateUs/RateUs.js
index 4aea9d8..d7fefe8 100644
--- a/FrontEnd/src/layouts/MainLayout/components/RateUs/RateUs.js
+++ b/FrontEnd/src/layouts/MainLayout/components/RateUs/RateUs.js
@@ -35,10 +35,10 @@ class RateUs extends Component {
- Let's spread the word that Ambar is awesome! Help us make Ambar even better, follow us on Twitter or give us your stars on Github.
+ Let's spread the word that Ambar is awesome! Help us make Ambar even better, give us your stars on Github.
- Together we will build the best document search system in the world!
+ Together we will build the best document search engine in the world!
goToUrl('https://github.com/RD17/ambar')}
icon={}
- />
- goToUrl('https://twitter.com/intent/tweet?text=%23Ambar%20is%20awesome%20%23DocumentSearchSystem!%20Check%20it%20out%20on%20https%3A%2F%2Fambar.cloud')}
- icon={}
- />
+ />
diff --git a/FrontEnd/src/utils/urls.js b/FrontEnd/src/utils/urls.js
index 29c81dc..17d1fe1 100644
--- a/FrontEnd/src/utils/urls.js
+++ b/FrontEnd/src/utils/urls.js
@@ -5,7 +5,7 @@ const init = (apiHost) => {
ambarWebApiSearchByStringQuery: (query, page, size) => `${apiHost}/api/search?query=${encodeURIComponent(query)}&page=${page}&size=${size}`,
ambarWebApiLoadContentHightlight: (fileId, query) => `${apiHost}/api/search/${fileId}/?query=${encodeURIComponent(query)}`,
ambarWebApiLoadFullContentHightlight: (fileId, query) => `${apiHost}/api/search/${fileId}/full?query=${encodeURIComponent(query)}`,
- ambarWebApiGetFile: (metaId) => `${apiHost}/api/files/${metaId}`,
+ ambarWebApiGetFile: (fullPath) => `${apiHost}/api/files/download?path=${encodeURIComponent(fullPath)}`,
ambarWebApiGetFileText: (metaId) => `${apiHost}/api/files/${metaId}/text`,
ambarWebApiGetCrawlers: () => `${apiHost}/api/crawlers`,
diff --git a/Install.md b/Install.md
index 0334a64..d96fc40 100644
--- a/Install.md
+++ b/Install.md
@@ -1,9 +1,9 @@
# Install docker and docker-compose
To install and configure Ambar you need an expertise in unix, Docker and Docker Compose.
-If you have any difficulties installing and running Ambar you can request a dedicated support session by mailing us on hello@ambar.cloud
+If you have any difficulties installing and running Ambar you can request a dedicated support session by mailing us on [hello@ambar.cloud](mailto:hello@ambar.cloud)
-Please refer to official [Docker](https://docs.docker.com/install/) and [Docker Compose](https://docs.docker.com/compose/install/) installation instructions.
+Please refer to official [Docker](https://docs.docker.com/install/) and [Docker Compose](https://docs.docker.com/compose/install/) installation instructions.
To check if everything is installed correctly please run:
@@ -44,11 +44,40 @@ Then modify it:
## Set up your crawlers
-- Find ```crawler0``` block - this is a template for your new crawler
-- Replace ```${crawlerName}``` with desired name for your crawler (only lowercase latin letters and dashes are supported)
+- Find ````${crawlerName}``` block - this is a template for your new crawler
+- Replace ```${crawlerName}``` with desired name for your crawler (only lowercase latin letters and dashes are supported). Check that service block name and crawler name are the same
- Replace ```${pathToCrawl}``` with path to a local folder to be crawled, if you want to crawl SMB or FTP - just mount it with standard unix tools
-You can add additional crawlers by copying ```crawler0``` segment and editing its settings (don't forget to edit the service name, e.g. to ```crawler1```).
+### Optional settings
+- `ignoreFolders` - ignore fodlers by [glob pattern](https://github.com/isaacs/node-glob#glob-primer)
+- `ignoreExtensions` - ignore file extensions by [glob pattern](https://github.com/isaacs/node-glob#glob-primer) (default: .{exe,dll})
+- `ignoreFileNames` - ignore file names by [glob pattern](https://github.com/isaacs/node-glob#glob-primer) (default: ~*)
+- `maxFileSize` - max file size (default: 300mb)
+
+### Crawler configuration example:
+```
+Docs:
+ depends_on:
+ serviceapi:
+ condition: service_healthy
+ image: ambar/ambar-local-crawler
+ restart: always
+ networks:
+ - internal_network
+ expose:
+ - "8082"
+ environment:
+ - name=Docs
+ - ignoreFolders=**/ForSharing/**
+ - ignoreExtensions=.{exe,dll,rar}
+ - ignoreFileNames=*backup*
+ - maxFileSize=15mb
+ volumes:
+ - /media/Docs:/usr/data
+```
+
+
+You can add more crawlers by copying ```${crawlerName}``` segment and editing its settings (don't forget to edit the service name).
# Start Ambar
@@ -58,4 +87,4 @@ To start Ambar run ```docker-compose up -d```.
Ambar UI will be accessible on ```http://${ambarHostIpAddress}/```
-If you have any difficulties installing and running Ambar you can request a dedicated support session by mailing us on hello@ambar.cloud
\ No newline at end of file
+If you have any difficulties installing and running Ambar you can request a dedicated support session by mailing us on [hello@ambar.cloud](mailto:hello@ambar.cloud)
\ No newline at end of file
diff --git a/LocalCrawler/Dockerfile b/LocalCrawler/Dockerfile
index dfb86bf..4308495 100644
--- a/LocalCrawler/Dockerfile
+++ b/LocalCrawler/Dockerfile
@@ -8,7 +8,7 @@ COPY . .
RUN yarn install
RUN yarn run build
-CMD node dist
+CMD node --max-old-space-size=8096 dist
HEALTHCHECK --interval=5s --timeout=30s --retries=50 \
CMD curl -f localhost:8082/api/ || exit 1
\ No newline at end of file
diff --git a/LocalCrawler/package.json b/LocalCrawler/package.json
index d5ef4ea..8239243 100644
--- a/LocalCrawler/package.json
+++ b/LocalCrawler/package.json
@@ -21,6 +21,7 @@
"author": "RD17",
"license": "MIT",
"dependencies": {
+ "amqplib": "^0.5.2",
"babel-eslint": "^7.1.0",
"babel-polyfill": "^6.26.0",
"body-parser": "^1.13.3",
@@ -32,6 +33,7 @@
"eslint-plugin-promise": "^3.3.0",
"express": "^4.13.3",
"idempotent-babel-polyfill": "^0.1.1",
+ "minimatch": "^3.0.4",
"moment": "^2.15.0",
"request": "^2.85.0",
"request-promise-native": "^1.0.5",
diff --git a/LocalCrawler/src/api/index.js b/LocalCrawler/src/api/index.js
index aa55be0..686c58d 100644
--- a/LocalCrawler/src/api/index.js
+++ b/LocalCrawler/src/api/index.js
@@ -1,6 +1,10 @@
import { version, name, description } from '../../package.json'
import { Router } from 'express'
-// import config from '../config'
+import fs from 'fs'
+import path from 'path'
+import config from '../config'
+
+import * as ApiProxy from '../services/ApiProxy'
export default () => {
let api = Router()
@@ -11,7 +15,42 @@ export default () => {
version: version,
description: description
})
- })
+ })
+
+ api.get('/download', (req, res) => {
+ const filePath = req.query.path
+
+ if (!filePath) {
+ res.sendStatus(400)
+ return
+ }
+
+ let absolutePath = null
+ let doesFileExist = false
+
+ try {
+ absolutePath = path.join(config.crawlPath, filePath)
+ doesFileExist = fs.existsSync(absolutePath)
+ } catch (error) {
+ ApiProxy.logData(config.name, 'error', `Error: ${error}`)
+ res.status(500).json({ error: error })
+ return
+ }
+
+ if (!doesFileExist) {
+ res.sendStatus(404)
+ return
+ }
+
+ res.download(absolutePath, (error) => {
+ if (error) {
+ if (!res.headersSent) {
+ res.status(500).json({ error: error })
+ }
+ ApiProxy.logData(config.name, 'error', `[${absolutePath}] Error: ${error}`)
+ }
+ })
+ })
return api
}
diff --git a/LocalCrawler/src/config.js b/LocalCrawler/src/config.js
index 2171f6d..7593fb7 100644
--- a/LocalCrawler/src/config.js
+++ b/LocalCrawler/src/config.js
@@ -1,11 +1,14 @@
const defaultConfig = {
"port": 8082,
"bodyLimit": "10mb",
- "crawlPath": "C:\\Dropbox\\Development\\Git\\Ambar\\LocalCrawler\\node_modules",
- "apiUrl": "http://ambar:8081",
- "allowedFilesRegex": '(\\.doc[a-z]*$)|(\\.xls[a-z]*$)|(\\.txt$)|(\\.pst$)|(\\.csv$)|(\\.htm[a-z]*$)|(\\.ppt[a-z]*$)|(\\.pdf$)|(\\.msg$)|(\\.zip$)|(\\.eml$)|(\\.rtf$)|(\\.md$)|(\\.png$)|(\\.bmp$)|(\\.tif[f]*$)|(\\.jp[e]*g$)',
- "name": "nodemodules-crawler",
- "maxFileSize": "30mb"
+ "crawlPath": "/usr/data",
+ "apiUrl": "http://serviceapi:8080",
+ "ignoreFolders": "**/test/**",
+ "ignoreExtensions": ".{exe,dll}",
+ "ignoreFileNames": "~*",
+ "name": "localhost",
+ "maxFileSize": "300mb",
+ "rabbitHost": "amqp://rabbit"
}
let config = null
diff --git a/LocalCrawler/src/index.js b/LocalCrawler/src/index.js
index 5aca9e1..98ec5ec 100644
--- a/LocalCrawler/src/index.js
+++ b/LocalCrawler/src/index.js
@@ -4,31 +4,47 @@ import cors from 'cors'
import bodyParser from 'body-parser'
import api from './api'
import config from './config'
+import cluster from 'cluster'
import 'babel-core/register'
import 'idempotent-babel-polyfill'
import { FileWatchService, ApiProxy } from './services'
-ApiProxy.logData(config.name, 'info', 'Crawler initialized')
-FileWatchService.startWatch()
+let app = null
+if (cluster.isMaster) {
+ ApiProxy.logData(config.name, 'info', 'API runs on master thread')
+ ApiProxy.logData(config.name, 'info', 'Creating fork for the file-watcher process')
+ cluster.fork()
+
+ app = express()
+ app.server = http.createServer(app)
+
+ app.use(cors({
+ credentials: true,
+ origin: true
+ }))
+
+ app.use(bodyParser.json({
+ limit: config.bodyLimit
+ }))
+
+ // api router
+ app.use('/api', api())
+ app.server.listen(process.env.PORT || config.port)
+
+ console.log(`Started API on ${app.server.address().address}:${app.server.address().port}`)
+
+
+} else {
+ ApiProxy.logData(config.name, 'info', 'File-watcher runs on worker thread')
+
+ FileWatchService.startWatch()
+ .catch(err => {
+ ApiProxy.logData(config.name, 'error', `Error: ${err}`)
+ process.exit(1)
+ })
+}
+
+export default app
-let app = express()
-app.server = http.createServer(app)
-
-app.use(cors({
- credentials: true,
- origin: true
-}))
-
-app.use(bodyParser.json({
- limit: config.bodyLimit
-}))
-
-// api router
-app.use('/api', api())
-app.server.listen(process.env.PORT || config.port)
-
-console.log(`Started on ${app.server.address().address}:${app.server.address().port}`)
-
-export default app
\ No newline at end of file
diff --git a/LocalCrawler/src/services/ApiProxy.js b/LocalCrawler/src/services/ApiProxy.js
index 6085ada..22d2d3f 100644
--- a/LocalCrawler/src/services/ApiProxy.js
+++ b/LocalCrawler/src/services/ApiProxy.js
@@ -23,11 +23,11 @@ export const doesParsedContentExist = (sha) => new Promise((resolve, reject) =>
})
})
-export const doesFileMetaExist = (meta) => new Promise((resolve, reject) => {
+export const doesFileMetaExist = (meta) => new Promise((resolve, reject) => {
const options = {
uri: `${config.apiUrl}/api/files/meta/exists`,
- method: 'POST',
- body: JSON.stringify(meta),
+ method: 'POST',
+ body: JSON.stringify(meta),
headers: {
'Content-Type': 'application/json'
},
@@ -36,7 +36,7 @@ export const doesFileMetaExist = (meta) => new Promise((resolve, reject) => {
}
return request(options)
- .then(response => {
+ .then(response => {
resolve(response.statusCode === 200)
})
.catch(err => {
@@ -48,15 +48,15 @@ export const doesFileMetaExist = (meta) => new Promise((resolve, reject) => {
export const addFileMeta = (meta, sha, crawlerId) => new Promise((resolve, reject) => {
const options = {
uri: `${config.apiUrl}/api/files/meta/${encodeURIComponent(sha)}/${encodeURIComponent(crawlerId)}`,
- method: 'POST',
- body: JSON.stringify(meta),
+ method: 'POST',
+ body: JSON.stringify(meta),
headers: {
'Content-Type': 'application/json'
- }
+ }
}
return request(options)
- .then(() => { resolve()})
+ .then(() => { resolve() })
.catch(err => {
console.log(err)
reject(err)
@@ -66,16 +66,16 @@ export const addFileMeta = (meta, sha, crawlerId) => new Promise((resolve, rejec
export const addFileContent = (filePath, sha) => new Promise((resolve, reject) => {
const options = {
uri: `${config.apiUrl}/api/files/content/${encodeURIComponent(sha)}`,
- method: 'POST',
+ method: 'POST',
formData: [
fs.createReadStream(filePath)
],
simple: false,
- resolveWithFullResponse: true
+ resolveWithFullResponse: true
}
return request(options)
- .then(response => {
+ .then(response => {
resolve(response.statusCode === 304 || response.statusCode === 201)
})
.catch(err => {
@@ -88,26 +88,26 @@ export const logData = (sourceId, type, message) => new Promise((resolve, reject
console.log(`[${type}] ${message}`)
- const record = {
+ const record = {
source_id: sourceId,
type: type,
message: message,
created_datetime: moment().format('YYYY-MM-DD HH:mm:ss.SSS')
- }
+ }
const options = {
uri: `${config.apiUrl}/api/logs`,
- method: 'POST',
- body: JSON.stringify(record),
+ method: 'POST',
+ body: JSON.stringify(record),
headers: {
'Content-Type': 'application/json'
- }
+ }
}
- return request(options)
- .then(() => { resolve()})
+ request(options)
+ .then(() => resolve())
.catch(err => {
- console.log(err)
+ console.error(err)
reject(err)
})
})
\ No newline at end of file
diff --git a/LocalCrawler/src/services/FileService.js b/LocalCrawler/src/services/FileService.js
deleted file mode 100644
index faada57..0000000
--- a/LocalCrawler/src/services/FileService.js
+++ /dev/null
@@ -1,15 +0,0 @@
-import crypto from 'crypto'
-import fs from 'fs'
-
-export const getFileMeta = (path) => fs.stat(path)
-
-export const getFileSha = (path) => new Promise((resolve) => {
- const shaSum = crypto.createHash('sha256')
- const readStream = fs.ReadStream(path)
-
- readStream.on('data', (data) => shaSum.update(data))
- readStream.on('end', () => {
- const result = shaSum.digest('hex')
- resolve(result)
- })
-})
\ No newline at end of file
diff --git a/LocalCrawler/src/services/FileWatchService.js b/LocalCrawler/src/services/FileWatchService.js
index 111122d..d1a7520 100644
--- a/LocalCrawler/src/services/FileWatchService.js
+++ b/LocalCrawler/src/services/FileWatchService.js
@@ -3,50 +3,28 @@ import chokidar from 'chokidar'
import path from 'path'
import config from '../config'
import bytes from 'bytes'
+import minimatch from 'minimatch'
import * as ApiProxy from './ApiProxy'
-import * as FileService from './FileService'
-
-const WAIT_MS = 500
-
-let tasks = []
-
-const allowedFilesRegex = new RegExp(config.allowedFilesRegex, 'gi')
-
-export const startWatch = () => {
- processTasks()
-
- chokidar.watch(config.crawlPath, { usePolling: true })
- .on('all', (event, pathToFile, stat) => {
- if (event === 'add' || event === 'change') {
- tasks.push({ event, pathToFile, stat })
- }
+import * as QueueProxy from './QueueProxy'
+
+export const startWatch = () => new Promise((resolve, reject) => {
+ QueueProxy.initRabbit()
+ .then(() => {
+ chokidar.watch(config.crawlPath, { usePolling: true, awaitWriteFinish: true })
+ .on('error', error => {
+ ApiProxy.logData(config.name, 'error', `Chokidar error: ${error}`)
+ })
+ .on('all', (event, pathToFile, stat) => {
+ if (event === 'add' || event === 'change' || event === 'unlink') {
+ addTask(event, pathToFile, stat)
+ }
+ })
+ })
+ .catch(err => {
+ ApiProxy.logData(config.name, 'error', `Error: ${err}`)
+ reject(err)
})
-}
-
-const processTasks = async () => {
- //eslint-disable-next-line no-constant-condition
- while (true) {
- if (tasks.length === 0) {
- await sleep()
- continue
- }
-
- const { pathToFile, stat } = tasks[0]
- tasks = tasks.slice(1)
-
- try {
- await tryAddFile(pathToFile, stat)
- } catch (err) {
- await ApiProxy.logData(config.name, 'error', `failed to submit ${pathToFile}`)
- }
- }
-}
-
-const sleep = () => new Promise((resolve) => {
- setTimeout(() => {
- resolve()
- }, WAIT_MS)
})
const shouldIgnore = (pathToFile, stat) => {
@@ -57,60 +35,59 @@ const shouldIgnore = (pathToFile, stat) => {
const maxFileSizeBytes = bytes.parse(config.maxFileSize)
if (stat.size === 0) {
+ ApiProxy.logData(config.name, 'verbose', `${pathToFile} ignoring. Rule: file.size != 0`)
return true
}
if (stat.size > maxFileSizeBytes) {
+ ApiProxy.logData(config.name, 'verbose', `${pathToFile} ignoring. Rule: file.size [${bytes(stat.size)}] > maxFileSize [${bytes(maxFileSizeBytes)}]`)
return true
}
- if (!allowedFilesRegex.test(pathToFile)) {
+ const extName = path.extname(pathToFile)
+ if (!extName) {
+ ApiProxy.logData(config.name, 'verbose', `${pathToFile} ignoring. Rule: File should have extension`)
return true
}
- if (!path.extname(pathToFile)) {
+ if (config.ignoreExtensions && minimatch(extName, config.ignoreExtensions)) {
+ ApiProxy.logData(config.name, 'verbose', `${pathToFile} ignoring. Rule: ignore extensions [${config.ignoreExtensions}]`)
+ return true
+ }
+
+ const fileName = path.basename(pathToFile, extName)
+ if (config.ignoreFileNames && minimatch(fileName, config.ignoreFileNames)) {
+ ApiProxy.logData(config.name, 'verbose', `${pathToFile} ignoring. Rule: ignore fileNames [${config.ignoreFileNames}]`)
+ return true
+ }
+
+ const dirName = path.dirname(pathToFile)
+ if (config.ignoreFolders && minimatch(dirName, config.ignoreFolders)) {
+ ApiProxy.logData(config.name, 'verbose', `${pathToFile} ignoring. Rule: ignore folders [${config.ignoreFolders}]`)
return true
}
return false
}
-const tryAddFile = async (pathToFile, stat) => {
+const addTask = (event, pathToFile, stat) => {
let normalizedPath = path.normalize(pathToFile)
normalizedPath = `//${normalizedPath.replace(config.crawlPath, config.name)}`.replace(/\\/g, '/')
if (shouldIgnore(normalizedPath, stat)) {
- await ApiProxy.logData(config.name, 'info', `${normalizedPath} ignoring`)
return
}
const meta = {
full_name: normalizedPath,
- updated_datetime: moment(stat.mtime).format('YYYY-MM-DD HH:mm:ss.SSS'),
- created_datetime: moment(stat.atime).format('YYYY-MM-DD HH:mm:ss.SSS'),
+ updated_datetime: !stat ? '' : moment(stat.mtime).format('YYYY-MM-DD HH:mm:ss.SSS'),
+ created_datetime: !stat ? '' :moment(stat.atime).format('YYYY-MM-DD HH:mm:ss.SSS'),
source_id: config.name,
short_name: path.basename(normalizedPath),
extension: path.extname(normalizedPath),
extra: []
}
- const metaExists = await ApiProxy.doesFileMetaExist(meta)
- if (metaExists) {
- await ApiProxy.logData(config.name, 'info', `${normalizedPath} meta exists`)
- return
- }
-
- const sha = await FileService.getFileSha(pathToFile)
- const contentExist = await ApiProxy.doesParsedContentExist(sha)
-
- if (contentExist) {
- await ApiProxy.logData(config.name, 'info', `${normalizedPath} - content found`)
- } else {
- await ApiProxy.addFileContent(pathToFile, sha)
- await ApiProxy.logData(config.name, 'info', `${normalizedPath} - content added`)
- }
-
- await ApiProxy.addFileMeta(meta, sha, config.name)
- await ApiProxy.logData(config.name, 'info', `${normalizedPath} - meta updated`)
+ QueueProxy.enqueueMessage({ event: event, meta: meta })
}
\ No newline at end of file
diff --git a/LocalCrawler/src/services/QueueProxy.js b/LocalCrawler/src/services/QueueProxy.js
new file mode 100644
index 0000000..38f9288
--- /dev/null
+++ b/LocalCrawler/src/services/QueueProxy.js
@@ -0,0 +1,38 @@
+import amqp from 'amqplib'
+import config from '../config'
+
+const AMBAR_PIPELINE_EXCHANGE = "AMBAR_PIPELINE_EXCHANGE"
+
+let channel = null
+
+const getPipelineMessagePriority = (fileName) => {
+ const regex = /(\.jp[e]*g$)|(\.png$)|(\.bmp$)|(\.tif[f]*$)|(\.pdf$)/i
+ const priority = regex.test(fileName) ? 1 : 2
+
+ return priority
+}
+
+export const enqueueMessage = (message) => {
+ const fileName = message.fileName || message.meta.short_name
+ const priority = getPipelineMessagePriority(fileName)
+ channel.publish(AMBAR_PIPELINE_EXCHANGE, '', Buffer.from(JSON.stringify(message)), { priority: priority })
+}
+
+export const initRabbit = () => new Promise((resolve, reject) => {
+ amqp.connect(`${config.rabbitHost}?heartbeat=0`)
+ .then((conn) => {
+ conn.on('error', (err) => {
+ //eslint-disable-next-line no-console
+ console.error('Rabbit error!')
+ throw err
+ })
+
+ return conn.createChannel()
+ .then(ch => {
+ channel = ch
+ resolve()
+ })
+ })
+ .catch(err => reject(err))
+})
+
diff --git a/LocalCrawler/yarn.lock b/LocalCrawler/yarn.lock
index d7dcbc8..64382fd 100644
--- a/LocalCrawler/yarn.lock
+++ b/LocalCrawler/yarn.lock
@@ -54,6 +54,16 @@ ajv@^5.1.0:
fast-json-stable-stringify "^2.0.0"
json-schema-traverse "^0.3.0"
+amqplib@^0.5.2:
+ version "0.5.2"
+ resolved "http://192.168.1.113:4873/amqplib/-/amqplib-0.5.2.tgz#d2d7313c7ffaa4d10bcf1e6252de4591b6cc7b63"
+ dependencies:
+ bitsyntax "~0.0.4"
+ bluebird "^3.4.6"
+ buffer-more-ints "0.0.2"
+ readable-stream "1.x >=1.1.9"
+ safe-buffer "^5.0.1"
+
ansi-escapes@^1.1.0:
version "1.4.0"
resolved "https://registry.yarnpkg.com/ansi-escapes/-/ansi-escapes-1.4.0.tgz#d3a8a83b319aa67793662b13e761c7911422306e"
@@ -854,6 +864,10 @@ balanced-match@^0.4.1:
version "0.4.2"
resolved "https://registry.yarnpkg.com/balanced-match/-/balanced-match-0.4.2.tgz#cb3f3e3c732dc0f01ee70b403f302e61d7709838"
+balanced-match@^1.0.0:
+ version "1.0.0"
+ resolved "http://192.168.1.113:4873/balanced-match/-/balanced-match-1.0.0.tgz#89b4d199ab2bee49de164ea02b89ce462d71b767"
+
base@^0.11.1:
version "0.11.2"
resolved "http://192.168.1.113:4873/base/-/base-0.11.2.tgz#7bde5ced145b6d551a90db87f83c558b4eb48a8f"
@@ -876,12 +890,22 @@ binary-extensions@^1.0.0:
version "1.8.0"
resolved "https://registry.yarnpkg.com/binary-extensions/-/binary-extensions-1.8.0.tgz#48ec8d16df4377eae5fa5884682480af4d95c774"
+bitsyntax@~0.0.4:
+ version "0.0.4"
+ resolved "http://192.168.1.113:4873/bitsyntax/-/bitsyntax-0.0.4.tgz#eb10cc6f82b8c490e3e85698f07e83d46e0cba82"
+ dependencies:
+ buffer-more-ints "0.0.2"
+
block-stream@*:
version "0.0.9"
resolved "https://registry.yarnpkg.com/block-stream/-/block-stream-0.0.9.tgz#13ebfe778a03205cfe03751481ebb4b3300c126a"
dependencies:
inherits "~2.0.0"
+bluebird@^3.4.6:
+ version "3.5.1"
+ resolved "http://192.168.1.113:4873/bluebird/-/bluebird-3.5.1.tgz#d9551f9de98f1fcda1e683d17ee91a0602ee2eb9"
+
body-parser@^1.13.3:
version "1.16.1"
resolved "https://registry.yarnpkg.com/body-parser/-/body-parser-1.16.1.tgz#51540d045adfa7a0c6995a014bb6b1ed9b802329"
@@ -922,6 +946,13 @@ brace-expansion@^1.0.0:
balanced-match "^0.4.1"
concat-map "0.0.1"
+brace-expansion@^1.1.7:
+ version "1.1.11"
+ resolved "http://192.168.1.113:4873/brace-expansion/-/brace-expansion-1.1.11.tgz#3c7fcbf529d87226f3d2f52b966ff5271eb441dd"
+ dependencies:
+ balanced-match "^1.0.0"
+ concat-map "0.0.1"
+
braces@^1.8.2:
version "1.8.5"
resolved "https://registry.yarnpkg.com/braces/-/braces-1.8.5.tgz#ba77962e12dff969d6b76711e914b737857bf6a7"
@@ -945,6 +976,10 @@ braces@^2.3.0, braces@^2.3.1:
split-string "^3.0.2"
to-regex "^3.0.1"
+buffer-more-ints@0.0.2:
+ version "0.0.2"
+ resolved "http://192.168.1.113:4873/buffer-more-ints/-/buffer-more-ints-0.0.2.tgz#26b3885d10fa13db7fc01aae3aab870199e0124c"
+
buffer-shims@^1.0.0:
version "1.0.0"
resolved "https://registry.yarnpkg.com/buffer-shims/-/buffer-shims-1.0.0.tgz#9978ce317388c649ad8793028c3477ef044a8b51"
@@ -2276,6 +2311,10 @@ is-windows@^1.0.2:
version "1.0.2"
resolved "http://192.168.1.113:4873/is-windows/-/is-windows-1.0.2.tgz#d1850eb9791ecd18e6182ce12a30f396634bb19d"
+isarray@0.0.1:
+ version "0.0.1"
+ resolved "http://192.168.1.113:4873/isarray/-/isarray-0.0.1.tgz#8a18acfca9a8f4177e09abfc6038939b05d1eedf"
+
isarray@1.0.0, isarray@^1.0.0, isarray@~1.0.0:
version "1.0.0"
resolved "https://registry.yarnpkg.com/isarray/-/isarray-1.0.0.tgz#bb935d48582cba168c06834957a54a3e07124f11"
@@ -2580,6 +2619,12 @@ minimatch@^3.0.0, minimatch@^3.0.2:
dependencies:
brace-expansion "^1.0.0"
+minimatch@^3.0.4:
+ version "3.0.4"
+ resolved "http://192.168.1.113:4873/minimatch/-/minimatch-3.0.4.tgz#5166e286457f03306064be5497e8dbb0c3d32083"
+ dependencies:
+ brace-expansion "^1.1.7"
+
minimist@0.0.8:
version "0.0.8"
resolved "https://registry.yarnpkg.com/minimist/-/minimist-0.0.8.tgz#857fcabfc3397d2625b8228262e86aa7a011b05d"
@@ -3039,6 +3084,15 @@ read-all-stream@^3.0.0:
pinkie-promise "^2.0.0"
readable-stream "^2.0.0"
+"readable-stream@1.x >=1.1.9":
+ version "1.1.14"
+ resolved "http://192.168.1.113:4873/readable-stream/-/readable-stream-1.1.14.tgz#7cf4c54ef648e3813084c636dd2079e166c081d9"
+ dependencies:
+ core-util-is "~1.0.0"
+ inherits "~2.0.1"
+ isarray "0.0.1"
+ string_decoder "~0.10.x"
+
readable-stream@^2.0.0, "readable-stream@^2.0.0 || ^1.1.13", readable-stream@^2.0.2, readable-stream@^2.2.2:
version "2.2.2"
resolved "https://registry.yarnpkg.com/readable-stream/-/readable-stream-2.2.2.tgz#a9e6fec3c7dda85f8bb1b3ba7028604556fc825e"
diff --git a/Pipeline/.vscode/launch.json b/Pipeline/.vscode/launch.json
new file mode 100644
index 0000000..f3400eb
--- /dev/null
+++ b/Pipeline/.vscode/launch.json
@@ -0,0 +1,117 @@
+{
+ // Use IntelliSense to learn about possible attributes.
+ // Hover to view descriptions of existing attributes.
+ // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
+ "version": "0.2.0",
+ "configurations": [
+ {
+ "name": "Python: Current File",
+ "pythonPath": "${config:python.pythonPath}",
+ "type": "python",
+ "request": "launch",
+ "program": "${file}"
+ },
+ {
+ "name": "Python: Attach",
+ "type": "python",
+ "request": "attach",
+ "localRoot": "${workspaceFolder}",
+ "remoteRoot": "${workspaceFolder}",
+ "port": 3000,
+ "secret": "my_secret",
+ "host": "localhost"
+ },
+ {
+ "name": "Python: Terminal (integrated)",
+ "type": "python",
+ "request": "launch",
+ "program": "${file}",
+ "console": "integratedTerminal"
+ },
+ {
+ "name": "Python: Terminal (external)",
+ "type": "python",
+ "request": "launch",
+ "program": "${file}",
+ "console": "externalTerminal"
+ },
+ {
+ "name": "Python: Django",
+ "type": "python",
+ "request": "launch",
+ "program": "${workspaceFolder}/manage.py",
+ "args": [
+ "runserver",
+ "--noreload",
+ "--nothreading"
+ ],
+ "debugOptions": [
+ "RedirectOutput",
+ "Django"
+ ]
+ },
+ {
+ "name": "Python: Flask (0.11.x or later)",
+ "type": "python",
+ "request": "launch",
+ "module": "flask",
+ "env": {
+ "FLASK_APP": "${workspaceFolder}/app.py"
+ },
+ "args": [
+ "run",
+ "--no-debugger",
+ "--no-reload"
+ ]
+ },
+ {
+ "name": "Python: Module",
+ "type": "python",
+ "request": "launch",
+ "module": "module.name"
+ },
+ {
+ "name": "Python: Pyramid",
+ "type": "python",
+ "request": "launch",
+ "args": [
+ "${workspaceFolder}/development.ini"
+ ],
+ "debugOptions": [
+ "RedirectOutput",
+ "Pyramid"
+ ]
+ },
+ {
+ "name": "Python: Watson",
+ "type": "python",
+ "request": "launch",
+ "program": "${workspaceFolder}/console.py",
+ "args": [
+ "dev",
+ "runserver",
+ "--noreload=True"
+ ]
+ },
+ {
+ "name": "Python: All debug Options",
+ "type": "python",
+ "request": "launch",
+ "pythonPath": "${config:python.pythonPath}",
+ "program": "${file}",
+ "module": "module.name",
+ "env": {
+ "VAR1": "1",
+ "VAR2": "2"
+ },
+ "envFile": "${workspaceFolder}/.env",
+ "args": [
+ "arg1",
+ "arg2"
+ ],
+ "debugOptions": [
+ "RedirectOutput"
+ ]
+ }
+ ]
+}
\ No newline at end of file
diff --git a/Pipeline/Dockerfile b/Pipeline/Dockerfile
index f7119a3..9ab7550 100644
--- a/Pipeline/Dockerfile
+++ b/Pipeline/Dockerfile
@@ -36,7 +36,7 @@ RUN mkdir /pst-temp
ENV JAVA_HOME /usr/lib/jvm/default-java
-CMD python ./pipeline.py -id $id -api_url $api_url -rabbit_host $rabbit_host
+CMD python ./pipeline.py
HEALTHCHECK --interval=5s --timeout=30s --retries=50 \
CMD if (pidof -x python > /dev/null) then (exit 0) else (exit 1) fi
\ No newline at end of file
diff --git a/Pipeline/apiproxy.py b/Pipeline/apiproxy.py
index 37d1559..055b424 100644
--- a/Pipeline/apiproxy.py
+++ b/Pipeline/apiproxy.py
@@ -1,10 +1,12 @@
import io
import re
import requests
+import urllib.parse
class ApiProxy:
- def __init__(self, ApiUrl, ApiCallTimeoutSeconds):
+ def __init__(self, ApiUrl, WebApiUrl, ApiCallTimeoutSeconds):
self.apiUrl = ApiUrl
+ self.webApiUrl = WebApiUrl
self.apiCallTimeoutSeconds = ApiCallTimeoutSeconds
def GetTaggingRules(self):
@@ -58,6 +60,18 @@ def CheckIfParsedAmbarFileContentExists(self, Sha):
apiResp.message = str(ex)
return apiResp
+ def CheckIfMetaExists(self, Meta):
+ apiResp = RestApiResponse()
+ try:
+ apiUri = '{0}/api/files/meta/exists'.format(self.apiUrl)
+ req = requests.post(apiUri, json = Meta, timeout = self.apiCallTimeoutSeconds)
+ apiResp.result = 'ok'
+ apiResp.code = req.status_code
+ except requests.exceptions.RequestException as ex:
+ apiResp.result = 'error'
+ apiResp.message = str(ex)
+ return apiResp
+
def CreateAmbarFileContent(self, FileData, Sha256):
apiResp = RestApiResponse()
try:
@@ -91,10 +105,10 @@ def RemoveFileContent(self, sha):
apiResp.message = str(ex)
return apiResp
- def GetFileContent(self, sha):
+ def GetFileContent(self, Sha):
apiResp = RestApiResponse()
try:
- apiUri = '{0}/api/files/content/{1}'.format(self.apiUrl, sha)
+ apiUri = '{0}/api/files/content/{1}'.format(self.apiUrl, Sha)
req = requests.get(apiUri, timeout = self.apiCallTimeoutSeconds)
if req.status_code == 200:
contentDispositionHeader = req.headers['content-disposition']
@@ -113,6 +127,76 @@ def GetFileContent(self, sha):
apiResp.result = 'error'
apiResp.message = str(ex)
return apiResp
+
+ def HideFile(self, FileId):
+ apiResp = RestApiResponse()
+ try:
+ apiUri = '{0}/api/files/hide/{1}'.format(self.webApiUrl, FileId)
+ req = requests.put(apiUri, timeout = self.apiCallTimeoutSeconds)
+ try:
+ apiResp.message = req.text
+ except:
+ pass
+ apiResp.result = 'ok'
+ apiResp.code = req.status_code
+ except requests.exceptions.RequestException as ex:
+ apiResp.result = 'error'
+ apiResp.message = str(ex)
+ return apiResp
+
+ def UnhideFile(self, FileId):
+ apiResp = RestApiResponse()
+ try:
+ apiUri = '{0}/api/files/unhide/{1}'.format(self.webApiUrl, FileId)
+ req = requests.put(apiUri, timeout = self.apiCallTimeoutSeconds)
+ try:
+ apiResp.message = req.text
+ except:
+ pass
+ apiResp.result = 'ok'
+ apiResp.code = req.status_code
+ except requests.exceptions.RequestException as ex:
+ apiResp.result = 'error'
+ apiResp.message = str(ex)
+ return apiResp
+
+ def DownloadFile(self, FullName):
+ apiResp = RestApiResponse()
+ try:
+ apiUri = '{0}/api/files/download?path={1}'.format(self.webApiUrl, urllib.parse.quote_plus(FullName))
+ req = requests.get(apiUri, timeout = self.apiCallTimeoutSeconds)
+ if req.status_code == 200:
+ apiResp.payload = req.content
+ else:
+ try:
+ apiResp.message = req.text
+ except:
+ pass
+ apiResp.result = 'ok'
+ apiResp.code = req.status_code
+ except requests.exceptions.RequestException as ex:
+ apiResp.result = 'error'
+ apiResp.message = str(ex)
+ return apiResp
+
+ def DownloadFileBySha(self, Sha):
+ apiResp = RestApiResponse()
+ try:
+ apiUri = '{0}/api/files/download?sha={1}'.format(self.webApiUrl, urllib.parse.quote_plus(Sha))
+ req = requests.get(apiUri, timeout = self.apiCallTimeoutSeconds)
+ if req.status_code == 200:
+ apiResp.payload = req.content
+ else:
+ try:
+ apiResp.message = req.text
+ except:
+ pass
+ apiResp.result = 'ok'
+ apiResp.code = req.status_code
+ except requests.exceptions.RequestException as ex:
+ apiResp.result = 'error'
+ apiResp.message = str(ex)
+ return apiResp
def GetParsedFileContent(self, Sha):
apiResp = RestApiResponse()
@@ -175,13 +259,12 @@ def EnqueueAmbarFileMeta(self, AmbarFileMeta, Sha, CrawlerId):
apiResp.result = 'error'
apiResp.message = str(ex)
return apiResp
-
- def SubmitProcessedFile(self, FileId, AmbarFileBytes):
+
+ def AddMetaIdToCache(self, MetaId):
apiResp = RestApiResponse()
try:
- files = {'file': (FileId, AmbarFileBytes)}
- apiUri = '{0}/api/files/file/{1}/processed'.format(self.apiUrl, FileId)
- req = requests.post(apiUri, files=files, timeout = self.apiCallTimeoutSeconds)
+ apiUri = '{0}/api/files/meta/{1}/processed'.format(self.apiUrl, MetaId)
+ req = requests.post(apiUri, timeout = self.apiCallTimeoutSeconds)
try:
apiResp.message = req.text
except:
@@ -192,12 +275,13 @@ def SubmitProcessedFile(self, FileId, AmbarFileBytes):
apiResp.result = 'error'
apiResp.message = str(ex)
return apiResp
-
- def RemoveAutoTags(self, FileId):
+
+ def SubmitProcessedFile(self, FileId, AmbarFileBytes):
apiResp = RestApiResponse()
try:
- apiUri = '{0}/api/files/autotags/{1}'.format(self.apiUrl, FileId)
- req = requests.delete(apiUri, timeout = self.apiCallTimeoutSeconds)
+ files = {'file': (FileId, AmbarFileBytes)}
+ apiUri = '{0}/api/files/file/{1}/processed'.format(self.apiUrl, FileId)
+ req = requests.post(apiUri, files=files, timeout = self.apiCallTimeoutSeconds)
try:
apiResp.message = req.text
except:
@@ -208,12 +292,12 @@ def RemoveAutoTags(self, FileId):
apiResp.result = 'error'
apiResp.message = str(ex)
return apiResp
-
- def AddFileTag(self, FileId, TagType, TagName):
+
+ def RemoveAutoTags(self, FileId):
apiResp = RestApiResponse()
try:
- apiUri = '{0}/api/tags/service/{1}/{2}/{3}'.format(self.apiUrl, FileId, TagType, TagName)
- req = requests.post(apiUri, timeout = self.apiCallTimeoutSeconds)
+ apiUri = '{0}/api/files/autotags/{1}'.format(self.apiUrl, FileId)
+ req = requests.delete(apiUri, timeout = self.apiCallTimeoutSeconds)
try:
apiResp.message = req.text
except:
@@ -225,11 +309,11 @@ def AddFileTag(self, FileId, TagType, TagName):
apiResp.message = str(ex)
return apiResp
- def CallExternalNER(self, ExternalNERUri, FileId, Sha):
+ def AddFileTag(self, FileId, TagType, TagName):
apiResp = RestApiResponse()
try:
- body = { 'fileId': FileId, 'sha': Sha }
- req = requests.post(ExternalNERUri, json=body, timeout = self.apiCallTimeoutSeconds)
+ apiUri = '{0}/api/tags/service/{1}/{2}/{3}'.format(self.apiUrl, FileId, TagType, TagName)
+ req = requests.post(apiUri, timeout = self.apiCallTimeoutSeconds)
try:
apiResp.message = req.text
except:
@@ -271,29 +355,6 @@ def SubmitThumbnail(self, ThumbId, ThumbData):
apiResp.message = str(ex)
return apiResp
- def GetAmbarCrawlerFileRegex(self, CrawlerId):
- apiResp = RestApiResponse()
- try:
- apiUri = '{0}/api/crawlers/{1}'.format(self.apiUrl, CrawlerId)
- req = requests.get(apiUri, timeout = self.apiCallTimeoutSeconds)
- if req.status_code == 200:
- try:
- apiResp.payload = req.json()['file_regex']
- except:
- apiResp.result = 'error'
- apiResp.message = str(ex)
- else:
- try:
- apiResp.message = req.text
- except:
- pass
- apiResp.result = 'ok'
- apiResp.code = req.status_code
- except requests.exceptions.RequestException as ex:
- apiResp.result = 'error'
- apiResp.message = str(ex)
- return apiResp
-
class RestApiResponse:
def __init__(self):
self.result = 'ok'
diff --git a/Pipeline/containerprocessors/archiveprocessor.py b/Pipeline/containerprocessors/archiveprocessor.py
index c13eba5..8c970d9 100644
--- a/Pipeline/containerprocessors/archiveprocessor.py
+++ b/Pipeline/containerprocessors/archiveprocessor.py
@@ -14,9 +14,6 @@ def __init__(self, Logger, ApiProxy):
def Process(self, FileData, FileMeta, SourceId):
self.logger.LogMessage('verbose','unzipping {0}'.format(FileMeta.full_name))
- ##TODO: Get fileRegex from crawler settings
- fileRegex = re.compile('(\\.doc[a-z]*$)|(\\.xls[a-z]*$)|(\\.txt$)|(\\.csv$)|(\\.htm[a-z]*$)|(\\.ppt[a-z]*$)|(\\.pdf$)|(\\.msg$)|(\\.zip$)|(\\.eml$)|(\\.rtf$)|(\\.md$)|(\\.png$)|(\\.bmp$)|(\\.tif[f]*$)|(\\.jp[e]*g$)',re.I)
-
try:
with ZipFile(io.BytesIO(FileData)) as zipFile:
for zipFileInfo in zipFile.infolist():
@@ -25,10 +22,6 @@ def Process(self, FileData, FileMeta, SourceId):
except:
unicodeName = zipFileInfo.filename
- if not fileRegex.search(unicodeName):
- self.logger.LogMessage('verbose','ignoring {0}/{1}'.format(FileMeta.full_name, unicodeName))
- continue
-
fullNameInArchive = '{0}/{1}'.format(FileMeta.full_name, unicodeName)
createUpdateTime = datetime(
zipFileInfo.date_time[0],
@@ -80,7 +73,7 @@ def Process(self, FileData, FileMeta, SourceId):
self.logger.LogMessage('verbose', 'content found {0}'.format(fullNameInArchive))
## sending meta back to queue
- fileMeta = AmbarFileMeta.InitWithoutId(createUpdateTime, createUpdateTime, unicodeName, fullNameInArchive, FileMeta.source_id)
+ fileMeta = AmbarFileMeta.InitWithoutId(createUpdateTime, createUpdateTime, unicodeName, fullNameInArchive, FileMeta.source_id, [{'key': 'from_container', 'value': 'true'}])
apiResp = self.apiProxy.EnqueueAmbarFileMeta(fileMeta, sha, SourceId)
@@ -92,10 +85,6 @@ def Process(self, FileData, FileMeta, SourceId):
self.logger.LogMessage('verbose', 'bad meta, ignoring... {0}'.format(fileMeta.full_name))
continue
- if apiResp.InsufficientStorage:
- self.logger.LogMessage('verbose', 'insufficient storage'.format(fileMeta.full_name))
- continue
-
if not apiResp.Ok:
self.logger.LogMessage('error', 'unexpected response on adding meta {0} {1} {2}'.format(fileMeta.full_name, apiResp.code, apiResp.message))
continue
diff --git a/Pipeline/containerprocessors/pstprocessor.py b/Pipeline/containerprocessors/pstprocessor.py
index 0b005c0..07bf04d 100644
--- a/Pipeline/containerprocessors/pstprocessor.py
+++ b/Pipeline/containerprocessors/pstprocessor.py
@@ -135,10 +135,9 @@ def Process(self, FileData, FileMeta, SourceId):
self.logger.LogMessage('verbose', 'content found {0}'.format(fullNameInArchive))
# sending meta back to queue
- fileMeta = AmbarFileMeta.InitWithoutId(FileMeta.created_datetime, FileMeta.updated_datetime, fileName, fullNameInArchive, FileMeta.source_id)
+ fileMeta = AmbarFileMeta.InitWithoutId(FileMeta.created_datetime, FileMeta.updated_datetime, fileName, fullNameInArchive, FileMeta.source_id, [{'key': 'from_container', 'value': 'true'}])
- apiResp = self.apiProxy.EnqueueAmbarFileMeta(
- fileMeta, sha, SourceId)
+ apiResp = self.apiProxy.EnqueueAmbarFileMeta(fileMeta, sha, SourceId)
if not apiResp.Success:
self.logger.LogMessage('error', 'error adding meta {0} {1}'.format(
@@ -149,10 +148,6 @@ def Process(self, FileData, FileMeta, SourceId):
self.logger.LogMessage('verbose', 'bad meta, ignoring... {0}'.format(fileMeta.full_name))
continue
- if apiResp.InsufficientStorage:
- self.logger.LogMessage('verbose', 'insufficient storage'.format(fileMeta.full_name))
- continue
-
if not apiResp.Ok:
self.logger.LogMessage('error', 'unexpected response on adding meta {0} {1} {2}'.format(fileMeta.full_name, apiResp.code, apiResp.message))
continue
diff --git a/Pipeline/model.py b/Pipeline/model.py
index faec44c..c4a8c95 100644
--- a/Pipeline/model.py
+++ b/Pipeline/model.py
@@ -167,7 +167,7 @@ def ParseFullNameIntoParts(cls, FullName):
return fullNameParts
@classmethod
- def InitFromDictWithId(cls, MetaDict):
+ def Init(cls, MetaDict):
amFileMeta = cls()
try:
amFileMeta.full_name = MetaDict['full_name']
@@ -178,7 +178,7 @@ def InitFromDictWithId(cls, MetaDict):
amFileMeta.source_id = MetaDict['source_id']
amFileMeta.created_datetime = MetaDict['created_datetime']
amFileMeta.updated_datetime = MetaDict['updated_datetime']
- amFileMeta.id = MetaDict['id']
+ amFileMeta.id = sha256('{0}{1}{2}{3}'.format(MetaDict['source_id'],MetaDict['full_name'],MetaDict['created_datetime'],MetaDict['updated_datetime']).encode('utf-8')).hexdigest()
## non serializable content
amFileMeta.initialized = True
amFileMeta.message = 'ok'
@@ -189,14 +189,14 @@ def InitFromDictWithId(cls, MetaDict):
@classmethod
def InitWithoutId(cls, CreateTime, UpdateTime, ShortName, FullName,
- AmbarCrawlerId):
+ AmbarCrawlerId, Extra = []):
amFileMeta = cls()
try:
amFileMeta.full_name = FullName
amFileMeta.full_name_parts = AmbarFileMeta.ParseFullNameIntoParts(FullName)
amFileMeta.short_name = ShortName
amFileMeta.extension = path.splitext(ShortName)[1] if path.splitext(ShortName)[1] != '' else path.splitext(ShortName)[0]
- amFileMeta.extra = []
+ amFileMeta.extra = Extra
amFileMeta.source_id = AmbarCrawlerId
if type(CreateTime) is str:
amFileMeta.created_datetime = CreateTime
diff --git a/Pipeline/pipeline.py b/Pipeline/pipeline.py
index 504756f..dec9994 100644
--- a/Pipeline/pipeline.py
+++ b/Pipeline/pipeline.py
@@ -12,7 +12,6 @@
import gc
import io
import sys
-import argparse
import os
import time
import hashlib
@@ -26,21 +25,19 @@
API_CALL_TIMEOUT_SECONDS = 1200
PARSE_TIMEOUT_SECONDS = 1200
-parser = argparse.ArgumentParser()
-parser.add_argument('-id', default='0')
-parser.add_argument('-api_url', default='http://ambar:8081')
-parser.add_argument('-rabbit_host', default='amqp://ambar')
+pipelineId = os.getenv('id', '0')
+apiUrl = os.getenv('api_url', 'http://serviceapi:8081')
+webApiUrl = os.getenv('web_api_url', 'http://webapi:8080')
+rabbitHost = os.getenv('rabbit_host','amqp://ambar')
ocrPdfSymbolsPerPageThreshold = int(os.getenv('ocrPdfSymbolsPerPageThreshold', 1000))
ocrPdfMaxPageCount = int(os.getenv('ocrPdfSymbolsPerPageThreshold', 5))
preserveOriginals = True if os.getenv('preserveOriginals', 'False') == 'True' else False
-args = parser.parse_args()
-
# instantiating Api proxy
-apiProxy = ApiProxy(args.api_url, API_CALL_TIMEOUT_SECONDS)
+apiProxy = ApiProxy(apiUrl, webApiUrl, API_CALL_TIMEOUT_SECONDS)
# instantiating logger
-logger = AmbarLogger(apiProxy, args.id)
+logger = AmbarLogger(apiProxy, pipelineId)
# instantiating ArchiveProcessor
archiveProcessor = ArchiveProcessor(logger, apiProxy)
# instantiating PstProcessor
@@ -56,11 +53,11 @@
logger.LogMessage('info', 'started')
# connecting to Rabbit
-logger.LogMessage(
- 'info', 'connecting to Rabbit {0}...'.format(args.rabbit_host))
+logger.LogMessage('info', 'connecting to Rabbit {0}...'.format(rabbitHost))
+
try:
rabbitConnection = pika.BlockingConnection(pika.URLParameters(
- '{0}?heartbeat={1}'.format(args.rabbit_host, RABBIT_HEARTBEAT)))
+ '{0}?heartbeat={1}'.format(rabbitHost, RABBIT_HEARTBEAT)))
rabbitChannel = rabbitConnection.channel()
rabbitChannel.basic_qos(prefetch_count=1, all_channels=True)
logger.LogMessage('info', 'connected to Rabbit!')
@@ -72,17 +69,87 @@
logger.LogMessage('info', 'waiting for messages...')
-def ProcessFile(sha, fileId, meta, sourceId):
+def ProcessFile(message):
try:
- logger.LogMessage('verbose', 'task received {0}'.format(sha))
+ meta = message['meta']
+ event = message['event']
+ sha = None
+
+ logger.LogMessage('verbose', '{0} task received for {1}'.format(event, meta['full_name']))
+
+ if ('sha' in message):
+ sha = message['sha']
+
+ fileId = sha256('{0}{1}'.format(meta['source_id'],meta['full_name']).encode('utf-8')).hexdigest()
- fileMeta = AmbarFileMeta.InitFromDictWithId(meta)
+ if (event == 'unlink'):
+ apiResp = apiProxy.HideFile(fileId)
+
+ if not apiResp.Success:
+ logger.LogMessage('error', 'error hidding file for {0} {1}'.format(meta['full_name'], apiResp.message))
+ return False
+
+ if apiResp.Ok:
+ logger.LogMessage('verbose', 'removed {0}'.format(meta['full_name']))
+ return True
+
+ if not apiResp.NotFound:
+ logger.LogMessage('error', 'error hidding file {0} {1} code: {2}'.format(meta['full_name'], apiResp.message, apiResp.code))
+ return False
+
+ return True
+
+ if (event != 'add' and event != 'change'):
+ print('Ignoring {0}'.format(event))
+ return True
+
+ apiResp = apiProxy.CheckIfMetaExists(meta)
+
+ if not apiResp.Success:
+ logger.LogMessage('error', 'error checking meta existance for {0} {1}'.format(meta['full_name'], apiResp.message))
+ return False
+
+ if apiResp.Ok:
+ logger.LogMessage('verbose', 'meta found for {0}'.format(meta['full_name']))
+ return True
+
+ if not apiResp.NotFound:
+ logger.LogMessage('error', 'error checking meta existance for {0} {1} {2}'.format(meta['full_name'], apiResp.code, apiResp.message))
+ return False
+
+ apiResp = apiProxy.UnhideFile(fileId)
+
+ if not apiResp.Success:
+ logger.LogMessage('error', 'error unhiding file {0} {1}'.format(meta['full_name'], apiResp.message))
+ return False
+
+ if not (apiResp.Ok or apiResp.NotFound):
+ logger.LogMessage('error', 'error unhiding file, unexpected response code {0} {1} {2}'.format(fileMeta.full_name, apiResp.code, apiResp.message))
+ return False
+
+ fileMeta = AmbarFileMeta.Init(meta)
if not fileMeta.initialized:
- logger.LogMessage(
- 'error', 'error initializing file meta {0}'.format(fileMeta.message))
+ logger.LogMessage('error', 'error initializing file meta {0}'.format(fileMeta.message))
+ return False
+
+ if (sha):
+ apiResp = apiProxy.DownloadFileBySha(sha)
+ else:
+ apiResp = apiProxy.DownloadFile(fileMeta.full_name)
+
+ if not apiResp.Success:
+ logger.LogMessage('error', 'error downloading file {0} {1}'.format(fileMeta.full_name, apiResp.message))
+ return False
+
+ if not apiResp.Ok:
+ logger.LogMessage('error', 'error downloading file {0} {1} code: {2}'.format(fileMeta.full_name, apiResp.message, apiResp.code))
return False
+ fileData = apiResp.payload
+
+ sha = sha256(fileData).hexdigest()
+
hasParsedContent = False
fileContent = {}
@@ -125,42 +192,13 @@ def ProcessFile(sha, fileId, meta, sourceId):
'verbose', 'parsed content found {0}'.format(fileMeta.full_name))
if not hasParsedContent:
- apiResp = apiProxy.GetFileContent(sha)
-
- if not apiResp.Success:
- logger.LogMessage('error', 'error retrieving file content {0} {1}'.format(
- fileMeta.full_name, apiResp.message))
- return False
-
- if apiResp.NotFound:
- logger.LogMessage(
- 'verbose', 'file content not found {0}'.format(fileMeta.full_name))
- return False
-
- if not apiResp.Ok:
- logger.LogMessage('error', 'error retrieving file content {0} {1} {2}'.format(
- fileMeta.full_name, apiResp.code, apiResp.message))
- return False
-
- # file received
- fileData = apiResp.payload
- logger.LogMessage(
- 'verbose', 'file content received {0}'.format(fileMeta.full_name))
-
- # checking received sha with calculated payload sha
- calculatedSha = sha256(fileData).hexdigest()
- if (calculatedSha != sha):
- logger.LogMessage('error', 'calculated sha ({0}) is not equal to received sha ({1}) for {2}'.format(
- calculatedSha, sha, fileMeta.full_name))
- return False
-
# checking if file is archive
if ContentTypeAnalyzer.IsArchive(fileMeta.short_name):
- archiveProcessor.Process(fileData, fileMeta, sourceId)
+ archiveProcessor.Process(fileData, fileMeta, fileMeta.source_id)
# checking if file is pst
if ContentTypeAnalyzer.IsPst(fileMeta.short_name):
- pstProcessor.Process(fileData, fileMeta, sourceId)
+ pstProcessor.Process(fileData, fileMeta, fileMeta.source_id)
# extracting
logger.LogMessage('verbose', 'parsing {0}'.format(fileMeta.full_name))
@@ -205,13 +243,11 @@ def ProcessFile(sha, fileId, meta, sourceId):
sha, fileContent.text.encode(encoding='utf_8', errors='ignore'))
if not apiResp.Success:
- logger.LogMessage('error', 'error submitting parsed text to Api {0} {1}'.format(
- fileMeta.full_name, apiResp.message))
+ logger.LogMessage('error', 'error submitting parsed text to Api {0} {1}'.format(fileMeta.full_name, apiResp.message))
return False
if not apiResp.Ok:
- logger.LogMessage('error', 'error submitting parsed text to Api, unexpected response code {0} {1} {2}'.format(
- fileMeta.full_name, apiResp.code, apiResp.message))
+ logger.LogMessage('error', 'error submitting parsed text to Api, unexpected response code {0} {1} {2}'.format(fileMeta.full_name, apiResp.code, apiResp.message))
return False
logger.LogMessage('verbose', 'parsed text submited {0}'.format(fileMeta.full_name))
@@ -226,8 +262,7 @@ def ProcessFile(sha, fileId, meta, sourceId):
ambarFile['file_id'] = fileId
ambarFile['indexed_datetime'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]
- apiResp = apiProxy.SubmitProcessedFile(fileId, json.dumps(
- dict(ambarFile)).encode(encoding='utf_8', errors='ignore'))
+ apiResp = apiProxy.SubmitProcessedFile(fileId, json.dumps(dict(ambarFile)).encode(encoding='utf_8', errors='ignore'))
if not apiResp.Success:
logger.LogMessage('error', 'error submitting parsed content to Api {0} {1}'.format(
@@ -239,21 +274,28 @@ def ProcessFile(sha, fileId, meta, sourceId):
fileMeta.full_name, apiResp.code, apiResp.message))
return False
- logger.LogMessage(
- 'verbose', 'parsed content submited {0}'.format(fileMeta.full_name))
+ logger.LogMessage('verbose', 'parsed content submited {0}'.format(fileMeta.full_name))
+
+ apiResp = apiProxy.AddMetaIdToCache(fileMeta.id)
+
+ if not apiResp.Success:
+ logger.LogMessage('error', 'error adding meta id to cache {0} {1}'.format(fileMeta.full_name, apiResp.message))
+ return False
+
+ if not apiResp.Ok:
+ logger.LogMessage('error', 'error adding meta id to cache, unexpected response code {0} {1} {2}'.format(fileMeta.full_name, apiResp.code, apiResp.message))
+ return False
# removing original file
if not preserveOriginals:
apiResp = apiProxy.RemoveFileContent(sha)
if not apiResp.Success:
- logger.LogMessage('error', 'error removing original file from Ambar for {0} {1}'.format(
- fileMeta.full_name, apiResp.message))
+ logger.LogMessage('error', 'error removing original file from Ambar for {0} {1}'.format(fileMeta.full_name, apiResp.message))
return False
if not (apiResp.Ok or apiResp.NotFound):
- logger.LogMessage('error', 'error removing original file from Ambar for {0}, unexpected response code {1} {2}'.format(
- fileMeta.full_name, apiResp.code, apiResp.message))
+ logger.LogMessage('error', 'error removing original file from Ambar for {0}, unexpected response code {1} {2}'.format(fileMeta.full_name, apiResp.code, apiResp.message))
return False
if apiResp.Ok:
@@ -263,31 +305,25 @@ def ProcessFile(sha, fileId, meta, sourceId):
## tags
apiResp = apiProxy.RemoveAutoTags(fileId)
if not apiResp.Success:
- logger.LogMessage('error', 'error removing autotags {0} {1}'.format(
- fileMeta.full_name, apiResp.message))
+ logger.LogMessage('error', 'error removing autotags {0} {1}'.format(fileMeta.full_name, apiResp.message))
return False
if not apiResp.Ok:
- logger.LogMessage('error', 'error removing autotags, unexpected response code {0} {1} {2}'.format(
- fileMeta.full_name, apiResp.code, apiResp.message))
+ logger.LogMessage('error', 'error removing autotags, unexpected response code {0} {1} {2}'.format(fileMeta.full_name, apiResp.code, apiResp.message))
return False
autoTagger.AutoTagAmbarFile(ambarFile)
return True
except Exception as e:
- logger.LogMessage('error', 'error processing task {0} {1}'.format(sha, repr(e)))
+ logger.LogMessage('error', 'error processing task {0}'.format(repr(e)))
return False
# main callback on receiving message from Rabbit
def RabbitConsumeCallback(channel, method, properties, body):
- bodyObject = json.loads(body.decode('utf-8'))
- sha = bodyObject['sha']
- fileId = bodyObject['fileId']
- meta = bodyObject['meta']
- sourceId = bodyObject['sourceId']
- if (ProcessFile(sha, fileId, meta, sourceId)):
+ message = json.loads(body.decode('utf-8'))
+ if (ProcessFile(message)):
channel.basic_ack(delivery_tag=method.delivery_tag)
else:
channel.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
diff --git a/README.md b/README.md
index 97064cf..9a9cef6 100644
--- a/README.md
+++ b/README.md
@@ -1,4 +1,4 @@
-[![Version](https://img.shields.io/badge/Version-v2.0.0rc-brightgreen.svg)](https://ambar.cloud)
+[![Version](https://img.shields.io/badge/Version-v2.1.8-brightgreen.svg)](https://ambar.cloud)
[![License](https://img.shields.io/badge/License-MIT-blue.svg)](https://github.com/RD17/ambar/blob/master/License.txt)
[![Blog](https://img.shields.io/badge/Ambar%20Blog-%20Latest%20news%20and%20tutorials%20-brightgreen.svg)](https://blog.ambar.cloud)
@@ -49,6 +49,7 @@ Crawling is automatic, no schedule is needed since the crawler monitors fs event
* OpenOffice documents
* RTF, Plaintext
* HTML / XHTML
+* Multithread processing (Only EE)
## Installation
@@ -94,6 +95,10 @@ It's limited by amount of RAM on your machine, typically it's 500MB. It's an awe
### I have a problem what should I do?
Request a dedicated support session by mailing us on hello@ambar.cloud
+## Sponsors
+
+- [IFIC.co.uk](http://www.ific.co.uk/)
+
## Change Log
[Change Log](https://github.com/RD17/ambar/blob/master/CHANGELOG.md)
@@ -101,4 +106,4 @@ Request a dedicated support session by mailing us on hello@ambar.cloud
[Privacy Policy](https://github.com/RD17/ambar/blob/master/privacy-policy.md)
## License
-[MIT License](https://github.com/RD17/ambar/blob/master/License.txt)
+[MIT License](https://github.com/RD17/ambar/blob/master/license.txt)
diff --git a/ServiceApi/src/api/files.js b/ServiceApi/src/api/files.js
index 95a76b9..0fe19f9 100644
--- a/ServiceApi/src/api/files.js
+++ b/ServiceApi/src/api/files.js
@@ -59,6 +59,17 @@ export default ({ storage }) => {
.catch(next)
})
+ /**
+ * Cache processed meta id
+ */
+ api.post('/meta/:metaId/processed', (req, res) => {
+ const { params: { metaId } } = req
+
+ CacheProxy.addMetaId(storage.redis, metaId)
+
+ res.sendStatus(200)
+ })
+
/**
* Enqueue meta for specified sha (enqueuing message to pipeline)
*/
@@ -77,9 +88,9 @@ export default ({ storage }) => {
return
}
- QueueProxy.enqueuePipelineMessage(storage, { sha: sha, fileId: generateFileId(meta.source_id, meta.full_name), sourceId: sourceId, meta: meta })
+ QueueProxy.enqueuePipelineMessage(storage, { event: 'add', sha: sha, fileId: generateFileId(meta.source_id, meta.full_name), sourceId: sourceId, meta: meta })
.then(() => {
- CacheProxy.addMetaId(storage.redis, meta.id)
+ //CacheProxy.addMetaId(storage.redis, meta.id)
res.sendStatus(200)
})
.catch(next)
diff --git a/ServiceApi/src/services/QueueProxy.js b/ServiceApi/src/services/QueueProxy.js
index 6e293bb..6935fd4 100644
--- a/ServiceApi/src/services/QueueProxy.js
+++ b/ServiceApi/src/services/QueueProxy.js
@@ -9,11 +9,6 @@ export const AMBAR_PIPELINE_WAITING_EXCHANGE = "AMBAR_PIPELINE_WAITING_EXCHANGE"
export const AMBAR_PIPELINE_WAITING_QUEUE_TTL = 60 * 60 * 1000
-export const AMBAR_CRAWLER_QUEUE = "AMBAR_CRAWLER_QUEUE"
-export const AMBAR_CRAWLER_EXCHANGE = "AMBAR_CRAWLER_EXCHANGE"
-
-export const AMBAR_CRAWLER_MESSAGE_DEFAULT_TTL = 10 * 1000
-
const getPipelineMessagePriority = (storage, fileName) => new Promise((resolve) => {
const regex = /(\.jp[e]*g$)|(\.png$)|(\.bmp$)|(\.tif[f]*$)|(\.pdf$)/i
const priority = regex.test(fileName) ? 1 : 2
@@ -36,17 +31,6 @@ export const enqueuePipelineMessage = (storage, message) => new Promise((resolve
.catch(err => reject(err))
})
-export const enqueueCrawlerMessage = (storage, message, ttl = AMBAR_CRAWLER_MESSAGE_DEFAULT_TTL) => new Promise((resolve, reject) => {
- storage.rabbit.createConfirmChannel()
- .then(channel => {
- channel.publish(AMBAR_CRAWLER_EXCHANGE, '', Buffer.from(JSON.stringify(message)), { expiration: ttl })
- return channel.waitForConfirms()
- .then(() => channel.close())
- })
- .then(() => resolve())
- .catch(err => reject(err))
-})
-
export const initRabbit = new Promise((resolve, reject) => {
amqp.connect(`${config.rabbitHost}?heartbeat=60`)
.then((conn) => {
@@ -67,11 +51,7 @@ export const initRabbit = new Promise((resolve, reject) => {
.then(() => channel.bindQueue(AMBAR_PIPELINE_QUEUE,
AMBAR_PIPELINE_EXCHANGE))
.then(() => channel.bindQueue(AMBAR_PIPELINE_WAITING_QUEUE,
- AMBAR_PIPELINE_WAITING_EXCHANGE))
- .then(() => channel.assertExchange(AMBAR_CRAWLER_EXCHANGE, 'fanout', { durable: false }))
- .then(() => channel.assertQueue(AMBAR_CRAWLER_QUEUE, { durable: false }))
- .then(() => channel.bindQueue(AMBAR_CRAWLER_QUEUE,
- AMBAR_CRAWLER_EXCHANGE))
+ AMBAR_PIPELINE_WAITING_EXCHANGE))
.then(() => channel.close())
)
.then(() => resolve(conn))
diff --git a/ServiceApi/src/utils/MetaBuilder.js b/ServiceApi/src/utils/MetaBuilder.js
index c6612f3..bd8fecc 100644
--- a/ServiceApi/src/utils/MetaBuilder.js
+++ b/ServiceApi/src/utils/MetaBuilder.js
@@ -20,8 +20,8 @@ export const buildMeta = (data) => {
const meta = {
id: generateMetaId(source_id, full_name, created_datetime, updated_datetime),
- short_name: short_name.toLowerCase(),
- full_name: full_name.toLowerCase(),
+ short_name: short_name,
+ full_name: full_name,
source_id: source_id,
extension: extension,
created_datetime: created_datetime,
@@ -35,8 +35,8 @@ export const buildMeta = (data) => {
export const buildShortMeta = (shortName, sourceId) => {
- const short_name = shortName.toLowerCase()
- const full_name = `//${sourceId.toLowerCase()}/${shortName.toLowerCase()}`
+ const short_name = shortName
+ const full_name = `//${sourceId}/${shortName}`
const source_id = sourceId
let extension = ''
let calculatedExtension = FILE_EXTENSION_REGEX.exec(short_name)
diff --git a/WebApi/package.json b/WebApi/package.json
index cb90b26..00d93c9 100644
--- a/WebApi/package.json
+++ b/WebApi/package.json
@@ -1,6 +1,6 @@
{
"name": "ambar-webapi",
- "version": "1.3.0",
+ "version": "2.1.0",
"description": "Ambar WebAPI",
"main": "dist",
"scripts": {
@@ -35,6 +35,7 @@
"eslint-plugin-promise": "^3.3.0",
"express": "^4.13.3",
"gridfs-stream": "^1.1.1",
+ "idempotent-babel-polyfill": "^0.1.1",
"minimist": "^1.2.0",
"moment": "^2.15.0",
"mongodb": "^2.2.10",
diff --git a/WebApi/src/api/files.js b/WebApi/src/api/files.js
index 4ba8d45..092a256 100644
--- a/WebApi/src/api/files.js
+++ b/WebApi/src/api/files.js
@@ -3,14 +3,20 @@ import ErrorResponse from '../utils/ErrorResponse'
import {
CryptoService,
EsProxy,
- CacheProxy,
GridFsProxy,
MongoProxy,
- FileUploader,
- QueueProxy
+ FileUploader,
+ QueueProxy,
+ CacheProxy
} from '../services'
+
import * as MetaBuilder from '../utils/MetaBuilder'
+import config from '../config'
+import request from 'request'
+
+const DOWNLOAD_URL_REGEX = /^\/\/([^/]+)\/(.*)$/i
+
const generateFileId = (source_id, full_name) => {
return CryptoService.getSha256(`${source_id}${full_name}`)
}
@@ -20,6 +26,58 @@ const generateExtractedTextFileName = (sha) => `text_${sha}`
export default ({ storage }) => {
let api = Router()
+ api.get('/download', (req, res, next) => {
+ const filePath = req.query.path
+ const sha = req.query.sha
+
+ if (!filePath && !sha) {
+ res.sendStatus(400)
+ return
+ }
+
+ if (sha) {
+ GridFsProxy.checkIfFileExists(storage.mongoDb, sha)
+ .then(fileExsists => {
+ if (!fileExsists) {
+ res.status(404).json(new ErrorResponse('File content not found'))
+ return
+ }
+
+ res.writeHead(200, {
+ 'Content-Type': 'application/octet-stream',
+ 'Content-Disposition': `attachment; filename*=UTF-8''${encodeURIComponent(sha)}`
+ })
+
+ GridFsProxy
+ .downloadFile(storage.mongoDb, sha)
+ .on('error', (err) => {
+ console.log('err during downloading by sha', err)
+ res.end()
+ })
+ .pipe(res)
+ })
+ .catch(next)
+
+ } else if (filePath) {
+ const match = DOWNLOAD_URL_REGEX.exec(filePath)
+
+ if (!match) {
+ res.sendStatus(400)
+ return
+ }
+
+ const { 1: crawlerName, 2: crawlerFilePath } = match
+
+ request
+ .get(`http://${crawlerName}:${config.crawlerPort}/api/download?path=${encodeURIComponent(crawlerFilePath)}`)
+ .on('error', (err) => {
+ console.log('err during downloading by path', err)
+ res.end()
+ })
+ .pipe(res)
+ }
+ })
+
//////////////// CALLED FROM UI ///////////////////////////////////////////
/**
* @api {get} api/files/:uri Download File Content by Secure Uri
@@ -186,9 +244,9 @@ export default ({ storage }) => {
return GridFsProxy.uploadFile(storage.mongoDb, sha, fileContent)
}
})
- .then(() => QueueProxy.enqueuePipelineMessage(storage, { sha: sha, fileId: generateFileId(meta.source_id, meta.full_name), sourceId: sourceId, meta: meta }))
+ .then(() => QueueProxy.enqueuePipelineMessage(storage, { event: 'add', sha: sha, fileId: generateFileId(meta.source_id, meta.full_name), sourceId: sourceId, meta: meta }))
.then(() => {
- CacheProxy.addMetaId(storage.redis, meta.id)
+ // CacheProxy.addMetaId(storage.redis, meta.id)
res.status(200).json({ fileId: generateFileId(meta.source_id, meta.full_name) })
})
.catch(next)
@@ -211,14 +269,16 @@ export default ({ storage }) => {
api.put('/hide/:fileId', (req, res, next) => {
const fileId = req.params.fileId
- EsProxy.checkIfFileExists(storage.elasticSearch, fileId)
- .then(fileExists => {
- if (!fileExists) {
+ EsProxy.getFileByFileId(storage.elasticSearch, fileId)
+ .then(file => {
+ if (!file) {
res.sendStatus(404)
return
}
- return EsProxy.hideFile(storage.elasticSearch, fileId)
+ CacheProxy.removeMetaId(storage.redis, file.meta.id)
+
+ return EsProxy.hideFile(storage.elasticSearch, file.file_id)
.then(() => res.sendStatus(200))
})
.catch(next)
@@ -241,25 +301,16 @@ export default ({ storage }) => {
api.put('/unhide/:fileId', (req, res, next) => {
const fileId = req.params.fileId
- EsProxy.checkIfFileExists(storage.elasticSearch, fileId)
- .then(fileExists => {
- if (!fileExists) {
+ EsProxy.unHideFile(storage.elasticSearch, fileId)
+ .then(() => res.sendStatus(200))
+ .catch(err => {
+ if ((err.statusCode) && (err.statusCode == 404)) {
res.sendStatus(404)
return
}
- return EsProxy.unHideFile(storage.elasticSearch, fileId)
- .then(() => res.sendStatus(200))
- .catch(err => {
- if ((err.statusCode) && (err.statusCode == 404)) {
- res.sendStatus(200)
- return
- }
-
- throw new Error(err)
- })
+ next(err)
})
- .catch(next)
})
return api
diff --git a/WebApi/src/config.js b/WebApi/src/config.js
index 21aec76..d0eb3dc 100644
--- a/WebApi/src/config.js
+++ b/WebApi/src/config.js
@@ -13,6 +13,7 @@ const defaultConfig = {
"rabbitHost": "amqp://ambar",
"uiLang": "en",
"analyticsToken": "",
+ "crawlerPort": 8082
}
const intParamsList = ['localPort']
diff --git a/WebApi/src/index.js b/WebApi/src/index.js
index bc9180f..45180bd 100644
--- a/WebApi/src/index.js
+++ b/WebApi/src/index.js
@@ -1,3 +1,4 @@
+import 'idempotent-babel-polyfill'
import http from 'http'
import express from 'express'
import cors from 'cors'
diff --git a/WebApi/src/services/CacheProxy.js b/WebApi/src/services/CacheProxy.js
index ef5a322..cf994d8 100644
--- a/WebApi/src/services/CacheProxy.js
+++ b/WebApi/src/services/CacheProxy.js
@@ -3,6 +3,7 @@ import { EsProxy, DateTimeService } from './index'
const TAGS_HASH_NAME = 'tags'
export const addMetaId = (redis, metaId) => { redis.set(`meta:${metaId}`, DateTimeService.getCurrentDateTime()) }
+export const removeMetaId = (redis, metaId) => { redis.del(`meta:${metaId}`) }
export const checkIfTokenExists = (redis, token) => redis.getAsync(token)
export const addToken = (redis, token, ttlSeconds) => {
diff --git a/WebApi/src/services/QueueProxy.js b/WebApi/src/services/QueueProxy.js
index d401139..dcac1bf 100644
--- a/WebApi/src/services/QueueProxy.js
+++ b/WebApi/src/services/QueueProxy.js
@@ -10,7 +10,7 @@ const getPipelineMessagePriority = (storage, fileName) => new Promise((resolve)
})
export const enqueuePipelineMessage = (storage, message) => new Promise((resolve, reject) => {
- const fileName = message.meta.short_name
+ const fileName = message.fileName || message.meta.short_name
storage.rabbit.createConfirmChannel()
.then(channel => {
diff --git a/WebApi/src/utils/MetaBuilder.js b/WebApi/src/utils/MetaBuilder.js
index f0db09f..50308d8 100644
--- a/WebApi/src/utils/MetaBuilder.js
+++ b/WebApi/src/utils/MetaBuilder.js
@@ -8,8 +8,8 @@ const generateMetaId = (source_id, full_name, created_datetime, updated_datetime
export const buildShortMeta = (shortName, sourceId) => {
- const short_name = shortName.toLowerCase()
- const full_name = `//${sourceId.toLowerCase()}/${shortName.toLowerCase()}`
+ const short_name = shortName
+ const full_name = `//${sourceId}/${shortName}`
const source_id = sourceId
let extension = ''
let calculatedExtension = FILE_EXTENSION_REGEX.exec(short_name)
diff --git a/WebApi/yarn.lock b/WebApi/yarn.lock
index 4f16ebd..48a44de 100644
--- a/WebApi/yarn.lock
+++ b/WebApi/yarn.lock
@@ -707,6 +707,14 @@ babel-polyfill@^6.16.0:
core-js "^2.4.0"
regenerator-runtime "^0.9.5"
+babel-polyfill@^6.26.0:
+ version "6.26.0"
+ resolved "http://192.168.1.113:4873/babel-polyfill/-/babel-polyfill-6.26.0.tgz#379937abc67d7895970adc621f284cd966cf2153"
+ dependencies:
+ babel-runtime "^6.26.0"
+ core-js "^2.5.0"
+ regenerator-runtime "^0.10.5"
+
babel-preset-es2015@^6.9.0:
version "6.18.0"
resolved "https://registry.yarnpkg.com/babel-preset-es2015/-/babel-preset-es2015-6.18.0.tgz#b8c70df84ec948c43dcf2bf770e988eb7da88312"
@@ -790,6 +798,13 @@ babel-runtime@^6.0.0, babel-runtime@^6.11.6, babel-runtime@^6.9.0, babel-runtime
core-js "^2.4.0"
regenerator-runtime "^0.9.5"
+babel-runtime@^6.26.0:
+ version "6.26.0"
+ resolved "http://192.168.1.113:4873/babel-runtime/-/babel-runtime-6.26.0.tgz#965c7058668e82b55d7bfe04ff2337bc8b5647fe"
+ dependencies:
+ core-js "^2.4.0"
+ regenerator-runtime "^0.11.0"
+
babel-template@^6.14.0, babel-template@^6.15.0, babel-template@^6.16.0, babel-template@^6.8.0:
version "6.16.0"
resolved "https://registry.yarnpkg.com/babel-template/-/babel-template-6.16.0.tgz#e149dd1a9f03a35f817ddbc4d0481988e7ebc8ca"
@@ -1109,6 +1124,10 @@ core-js@^2.4.0:
version "2.4.1"
resolved "https://registry.yarnpkg.com/core-js/-/core-js-2.4.1.tgz#4de911e667b0eae9124e34254b53aea6fc618d3e"
+core-js@^2.5.0:
+ version "2.5.6"
+ resolved "http://192.168.1.113:4873/core-js/-/core-js-2.5.6.tgz#0fe6d45bf3cac3ac364a9d72de7576f4eb221b9d"
+
core-util-is@~1.0.0:
version "1.0.2"
resolved "https://registry.yarnpkg.com/core-util-is/-/core-util-is-1.0.2.tgz#b5fd54220aa2bc5ab57aab7140c940754503c1a7"
@@ -1785,6 +1804,12 @@ iconv-lite@^0.4.17:
dependencies:
safer-buffer "^2.1.0"
+idempotent-babel-polyfill@^0.1.1:
+ version "0.1.1"
+ resolved "http://192.168.1.113:4873/idempotent-babel-polyfill/-/idempotent-babel-polyfill-0.1.1.tgz#f85d2ecaf36b05a652457b25c804289512360d94"
+ dependencies:
+ babel-polyfill "^6.26.0"
+
ignore-by-default@^1.0.0:
version "1.0.1"
resolved "https://registry.yarnpkg.com/ignore-by-default/-/ignore-by-default-1.0.1.tgz#48ca6d72f6c6a3af00a9ad4ae6876be3889e2b09"
@@ -2703,6 +2728,14 @@ regenerate@^1.2.1:
version "1.3.2"
resolved "https://registry.yarnpkg.com/regenerate/-/regenerate-1.3.2.tgz#d1941c67bad437e1be76433add5b385f95b19260"
+regenerator-runtime@^0.10.5:
+ version "0.10.5"
+ resolved "http://192.168.1.113:4873/regenerator-runtime/-/regenerator-runtime-0.10.5.tgz#336c3efc1220adcedda2c9fab67b5a7955a33658"
+
+regenerator-runtime@^0.11.0:
+ version "0.11.1"
+ resolved "http://192.168.1.113:4873/regenerator-runtime/-/regenerator-runtime-0.11.1.tgz#be05ad7f9bf7d22e056f9726cee5017fbf19e2e9"
+
regenerator-runtime@^0.9.5:
version "0.9.6"
resolved "https://registry.yarnpkg.com/regenerator-runtime/-/regenerator-runtime-0.9.6.tgz#d33eb95d0d2001a4be39659707c51b0cb71ce029"
diff --git a/docker-compose.yml b/docker-compose.yml
index 6a04d55..8210e95 100644
--- a/docker-compose.yml
+++ b/docker-compose.yml
@@ -12,18 +12,14 @@ services:
volumes:
- ${dataPath}/db:/data/db
expose:
- - "27017"
- ports:
- - "27017:27017"
+ - "27017"
es:
restart: always
networks:
- internal_network
image: ambar/ambar-es:latest
expose:
- - "9200"
- ports:
- - "9200:9200"
+ - "9200"
environment:
- cluster.name=ambar-es
- ES_JAVA_OPTS=-Xms2g -Xmx2g
@@ -46,10 +42,7 @@ services:
hostname: rabbit
expose:
- "15672"
- - "5672"
- ports:
- - "15672:15672"
- - "5672:5672"
+ - "5672"
volumes:
- ${dataPath}/rabbit:/var/lib/rabbitmq
redis:
@@ -60,9 +53,7 @@ services:
- internal_network
image: ambar/ambar-redis:latest
expose:
- - "6379"
- ports:
- - "6379:6379"
+ - "6379"
serviceapi:
depends_on:
redis:
@@ -78,18 +69,14 @@ services:
- internal_network
image: ambar/ambar-serviceapi:latest
expose:
- - "8081"
- ports:
- - "8081:8081"
+ - "8081"
environment:
- mongoDbUrl=mongodb://db:27017/ambar_data
- elasticSearchUrl=http://es:9200
- redisHost=redis
- redisPort=6379
- rabbitHost=amqp://rabbit
- - langAnalyzer=${langAnalyzer}
- volumes:
- - /var/run/docker.sock:/var/run/docker.sock
+ - langAnalyzer=${langAnalyzer}
webapi:
depends_on:
serviceapi:
@@ -110,9 +97,7 @@ services:
- redisHost=redis
- redisPort=6379
- serviceApiUrl=http://serviceapi:8081
- - rabbitHost=amqp://rabbit
- volumes:
- - /var/run/docker.sock:/var/run/docker.sock
+ - rabbitHost=amqp://rabbit
frontend:
depends_on:
webapi:
@@ -139,7 +124,7 @@ services:
- id=0
- api_url=http://serviceapi:8081
- rabbit_host=amqp://rabbit
- crawler0:
+ ${crawlerName}:
depends_on:
serviceapi:
condition: service_healthy
@@ -147,9 +132,9 @@ services:
restart: always
networks:
- internal_network
- environment:
- - apiUrl=http://serviceapi:8081
- - crawlPath=/usr/data
+ expose:
+ - "8082"
+ environment:
- name=${crawlerName}
volumes:
- ${pathToCrawl}:/usr/data